From 70a46deba39441052aec2ae81beead75c903b953 Mon Sep 17 00:00:00 2001 From: George Wu Date: Mon, 4 Nov 2024 14:02:03 -0800 Subject: [PATCH 1/5] Add a wait on start() for task lifecycle to go into running --- .../extensions-contrib/k8s-jobs.md | 1 + .../k8s/overlord/KubernetesPeonLifecycle.java | 27 ++++++++--- .../k8s/overlord/KubernetesTaskRunner.java | 47 ++++++++++++++++--- .../overlord/KubernetesTaskRunnerConfig.java | 28 ++++++++++- .../k8s/overlord/KubernetesWorkItem.java | 8 ++++ .../overlord/KubernetesPeonLifecycleTest.java | 25 +++++----- .../overlord/KubernetesTaskRunnerTest.java | 21 ++++----- 7 files changed, 119 insertions(+), 38 deletions(-) diff --git a/docs/development/extensions-contrib/k8s-jobs.md b/docs/development/extensions-contrib/k8s-jobs.md index 913e40b93733..11663642bdd1 100644 --- a/docs/development/extensions-contrib/k8s-jobs.md +++ b/docs/development/extensions-contrib/k8s-jobs.md @@ -610,6 +610,7 @@ Druid selects the pod template `podSpecWithHighMemRequests.yaml`. |`druid.indexer.runner.maxTaskDuration`| `Duration` | Max time a task is allowed to run for before getting killed |`PT4H`|No| |`druid.indexer.runner.taskCleanupDelay`| `Duration` | How long do jobs stay around before getting reaped from K8s |`P2D`|No| |`druid.indexer.runner.taskCleanupInterval`| `Duration` | How often to check for jobs to be reaped |`PT10M`|No| +|`druid.indexer.runner.taskJoinTimeout`| `Duration` | Timeout for gathering metadata about existing tasks on startup |`PT1M`|No| |`druid.indexer.runner.K8sjobLaunchTimeout`| `Duration` | How long to wait to launch a K8s task before marking it as failed, on a resource constrained cluster it may take some time. |`PT1H`|No| |`druid.indexer.runner.javaOptsArray`| `JsonArray` | java opts for the task. |`-Xmx1g`|No| |`druid.indexer.runner.labels`| `JsonObject` | Additional labels you want to add to peon pod |`{}`|No| diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java index eaef0cba6a15..08eaa20e5f07 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.SettableFuture; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodStatus; import io.fabric8.kubernetes.api.model.batch.v1.Job; @@ -72,11 +73,11 @@ public interface TaskStateListener protected enum State { - /** Lifecycle's state before {@link #run(Job, long, long, boolean)} or {@link #join(long)} is called. */ + /** Lifecycle's state before {@link #run(Job, long, long, boolean)} or {@link #join(long, SettableFuture)} is called. */ NOT_STARTED, /** Lifecycle's state since {@link #run(Job, long, long, boolean)} is called. */ PENDING, - /** Lifecycle's state since {@link #join(long)} is called. */ + /** Lifecycle's state since {@link #join(long, SettableFuture)} is called. */ RUNNING, /** Lifecycle's state since the task has completed. */ STOPPED @@ -89,6 +90,7 @@ protected enum State private final KubernetesPeonClient kubernetesClient; private final ObjectMapper mapper; private final TaskStateListener stateListener; + @MonotonicNonNull private LogWatch logWatch; @@ -137,7 +139,7 @@ protected synchronized TaskStatus run(Job job, long launchTimeout, long timeout, TimeUnit.MILLISECONDS ); - return join(timeout); + return join(timeout, null); } catch (Exception e) { log.info("Failed to run task: %s", taskId.getOriginalTaskId()); @@ -174,11 +176,13 @@ private void writeTaskPayload(Task task) throws IOException * @return * @throws IllegalStateException */ - protected synchronized TaskStatus join(long timeout) throws IllegalStateException + protected synchronized TaskStatus join(long timeout, SettableFuture taskActiveStatusFuture) throws IllegalStateException { try { updateState(new State[]{State.NOT_STARTED, State.PENDING}, State.RUNNING); - + if (taskActiveStatusFuture != null) { + taskActiveStatusFuture.set(true); + } JobResponse jobResponse = kubernetesClient.waitForPeonJobCompletion( taskId, timeout, @@ -189,6 +193,9 @@ protected synchronized TaskStatus join(long timeout) throws IllegalStateExceptio } finally { try { + if (taskActiveStatusFuture != null) { + taskActiveStatusFuture.set(false); + } saveLogs(); } catch (Exception e) { @@ -245,7 +252,10 @@ protected State getState() protected TaskLocation getTaskLocation() { if (State.PENDING.equals(state.get()) || State.NOT_STARTED.equals(state.get())) { - log.debug("Can't get task location for non-running job. [%s]", taskId.getOriginalTaskId()); + /* This should not actually ever happen because KubernetesTaskRunner.start() should not return until all running tasks + have already gone into State.RUNNING, so getTaskLocation should not be called. + */ + log.warn("Can't get task location for non-running job. [%s]", taskId.getOriginalTaskId()); return TaskLocation.unknown(); } @@ -256,6 +266,10 @@ protected TaskLocation getTaskLocation() if (taskLocation == null) { Optional maybePod = kubernetesClient.getPeonPod(taskId.getK8sJobName()); if (!maybePod.isPresent()) { + /* Arguably we should throw a exception here but leaving it as a warn log to prevent unexpected errors. + If there is strange behavior during overlord restarts the operator should look for this warn log. + */ + log.warn("Could not get task location from k8s."); return TaskLocation.unknown(); } @@ -263,6 +277,7 @@ protected TaskLocation getTaskLocation() PodStatus podStatus = pod.getStatus(); if (podStatus == null || podStatus.getPodIP() == null) { + log.warn("Could not get task location from k8s."); return TaskLocation.unknown(); } taskLocation = TaskLocation.create( diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index c324b49e13a2..ba3bc662a550 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -28,7 +28,9 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; import io.fabric8.kubernetes.api.model.batch.v1.Job; +import org.apache.druid.common.guava.FutureUtils; import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; @@ -55,12 +57,14 @@ import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter; import org.apache.druid.tasklogs.TaskLogStreamer; import org.jboss.netty.handler.codec.http.HttpMethod; +import org.joda.time.DateTime; import org.joda.time.Duration; import javax.annotation.Nullable; import java.io.IOException; import java.io.InputStream; import java.net.URL; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -151,11 +155,10 @@ public ListenableFuture run(Task task) } } - protected ListenableFuture joinAsync(Task task) + protected KubernetesWorkItem joinAsync(Task task) { synchronized (tasks) { - return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> joinTask(task)))) - .getResult(); + return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> joinTask(task)))); } } @@ -178,6 +181,7 @@ protected TaskStatus doTask(Task task, boolean run) this::emitTaskStateMetrics ); + SettableFuture taskActiveStatusFuture; synchronized (tasks) { KubernetesWorkItem workItem = tasks.get(task.getId()); @@ -186,6 +190,7 @@ protected TaskStatus doTask(Task task, boolean run) } workItem.setKubernetesPeonLifecycle(peonLifecycle); + taskActiveStatusFuture = workItem.getTaskActiveStatusFuture(); } TaskStatus taskStatus; @@ -198,7 +203,9 @@ protected TaskStatus doTask(Task task, boolean run) ); } else { taskStatus = peonLifecycle.join( - config.getTaskTimeout().toStandardDuration().getMillis() + config.getTaskTimeout().toStandardDuration().getMillis(), + taskActiveStatusFuture + ); } @@ -321,15 +328,41 @@ public List>> restore() public void start() { log.info("Starting K8sTaskRunner..."); - // Load tasks from previously running jobs and wait for their statuses to be updated asynchronously. - for (Job job : client.getPeonJobs()) { + // Load tasks from previously running jobs and wait for their statuses to start running. + final List> taskStatusActiveList = new ArrayList<>(); + final List peonJobs = client.getPeonJobs(); + + log.info("Locating [%,d] active tasks.", peonJobs.size()); + for (Job job : peonJobs) { try { - joinAsync(adapter.toTask(job)); + KubernetesWorkItem kubernetesWorkItem = joinAsync(adapter.toTask(job)); + taskStatusActiveList.add(kubernetesWorkItem.getTaskActiveStatusFuture()); } catch (IOException e) { log.error(e, "Error deserializing task from job [%s]", job.getMetadata().getName()); } } + + try { + final DateTime nowUtc = DateTimes.nowUtc(); + final long timeoutMs = nowUtc.plus(config.getTaskJoinTimeout()).getMillis() - nowUtc.getMillis(); + if (timeoutMs > 0) { + FutureUtils.coalesce(taskStatusActiveList).get(timeoutMs, TimeUnit.MILLISECONDS); + } + log.info("Located [%,d] active tasks.", taskStatusActiveList.size()); + } + catch (Exception e) { + final long numInitialized = + tasks.values().stream().filter(item -> item.getTaskActiveStatusFuture().isDone()).count(); + log.warn( + e, + "Located [%,d] out of [%,d] active tasks (timeout = %s). Locating others asynchronously.", + numInitialized, + taskStatusActiveList.size(), + config.getTaskJoinTimeout() + ); + } + log.info("Loaded %,d tasks from previous run", tasks.size()); cleanupExecutor.scheduleAtFixedRate( diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java index 60efa3c48569..106378f57aa6 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java @@ -85,6 +85,12 @@ public class KubernetesTaskRunnerConfig // interval for k8s job cleanup to run private Period taskCleanupInterval = new Period("PT10m"); + @JsonProperty + @NotNull + // how long to wait to join peon k8s jobs on startup + private Period taskJoinTimeout = new Period("PT1M"); + + @JsonProperty @NotNull // how long to wait for the peon k8s job to launch @@ -140,7 +146,8 @@ private KubernetesTaskRunnerConfig( int cpuCoreInMicro, Map labels, Map annotations, - Integer capacity + Integer capacity, + Period taskJoinTimeout ) { this.namespace = namespace; @@ -181,6 +188,10 @@ private KubernetesTaskRunnerConfig( k8sjobLaunchTimeout, this.k8sjobLaunchTimeout ); + this.taskJoinTimeout = ObjectUtils.defaultIfNull( + taskJoinTimeout, + this.taskJoinTimeout + ); this.peonMonitors = ObjectUtils.defaultIfNull( peonMonitors, this.peonMonitors @@ -247,6 +258,11 @@ public Period getTaskTimeout() { return maxTaskDuration; } + public Period getTaskJoinTimeout() + { + return taskJoinTimeout; + } + public Period getTaskCleanupDelay() { @@ -317,6 +333,7 @@ public static class Builder private Map labels; private Map annotations; private Integer capacity; + private Period taskJoinTimeout; public Builder() { @@ -425,6 +442,12 @@ public Builder withCapacity(@Min(0) @Max(Integer.MAX_VALUE) Integer capacity) return this; } + public Builder withTaskJoinTimeout(Period taskJoinTimeout) + { + this.taskJoinTimeout = taskJoinTimeout; + return this; + } + public KubernetesTaskRunnerConfig build() { return new KubernetesTaskRunnerConfig( @@ -444,7 +467,8 @@ public KubernetesTaskRunnerConfig build() this.cpuCoreInMicro, this.labels, this.annotations, - this.capacity + this.capacity, + this.taskJoinTimeout ); } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java index 94d4bbb67f63..3cf10938156b 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java @@ -22,6 +22,7 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; @@ -35,11 +36,13 @@ public class KubernetesWorkItem extends TaskRunnerWorkItem { private final Task task; private KubernetesPeonLifecycle kubernetesPeonLifecycle = null; + private final SettableFuture taskActiveStatusFuture; public KubernetesWorkItem(Task task, ListenableFuture statusFuture) { super(task.getId(), statusFuture); this.task = task; + this.taskActiveStatusFuture = SettableFuture.create(); } protected synchronized void setKubernetesPeonLifecycle(KubernetesPeonLifecycle kubernetesPeonLifecycle) @@ -119,4 +122,9 @@ public Task getTask() { return task; } + + public SettableFuture getTaskActiveStatusFuture() + { + return taskActiveStatusFuture; + } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java index 59c3700b1fc1..ef7e22c68db6 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; +import com.google.common.util.concurrent.SettableFuture; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodBuilder; import io.fabric8.kubernetes.api.model.batch.v1.Job; @@ -90,7 +91,7 @@ public void test_run() throws IOException ) { @Override - protected synchronized TaskStatus join(long timeout) + protected synchronized TaskStatus join(long timeout, SettableFuture taskActiveStatusFuture) { return TaskStatus.success(ID); } @@ -135,7 +136,7 @@ public void test_run_useTaskManager() throws IOException ) { @Override - protected synchronized TaskStatus join(long timeout) + protected synchronized TaskStatus join(long timeout, SettableFuture taskActiveStatusFuture) { return TaskStatus.success(ID); } @@ -179,7 +180,7 @@ public void test_run_whenCalledMultipleTimes_raisesIllegalStateException() throw ) { @Override - protected synchronized TaskStatus join(long timeout) + protected synchronized TaskStatus join(long timeout, SettableFuture taskActiveStatusFuture) { return TaskStatus.success(ID); } @@ -228,7 +229,7 @@ public void test_run_whenExceptionRaised_setsRunnerTaskStateToNone() ) { @Override - protected synchronized TaskStatus join(long timeout) + protected synchronized TaskStatus join(long timeout, SettableFuture taskActiveStatusFuture) { throw new IllegalStateException(); } @@ -288,7 +289,7 @@ public void test_join_withoutJob_returnsFailedTaskStatus() throws IOException replayAll(); - TaskStatus taskStatus = peonLifecycle.join(0L); + TaskStatus taskStatus = peonLifecycle.join(0L, null); verifyAll(); @@ -342,7 +343,7 @@ public void test_join() throws IOException replayAll(); - TaskStatus taskStatus = peonLifecycle.join(0L); + TaskStatus taskStatus = peonLifecycle.join(0L, null); verifyAll(); @@ -396,12 +397,12 @@ public void test_join_whenCalledMultipleTimes_raisesIllegalStateException() thro replayAll(); - TaskStatus taskStatus = peonLifecycle.join(0L); + TaskStatus taskStatus = peonLifecycle.join(0L, null); Assert.assertThrows( "Task [id] failed to join: invalid peon lifecycle state transition [STOPPED]->[PENDING]", IllegalStateException.class, - () -> peonLifecycle.join(0L) + () -> peonLifecycle.join(0L, null) ); verifyAll(); @@ -448,7 +449,7 @@ public void test_join_withoutTaskStatus_returnsFailedTaskStatus() throws IOExcep replayAll(); - TaskStatus taskStatus = peonLifecycle.join(0L); + TaskStatus taskStatus = peonLifecycle.join(0L, null); verifyAll(); @@ -498,7 +499,7 @@ public void test_join_whenIOExceptionThrownWhileStreamingTaskStatus_returnsFaile replayAll(); - TaskStatus taskStatus = peonLifecycle.join(0L); + TaskStatus taskStatus = peonLifecycle.join(0L, null); verifyAll(); @@ -550,7 +551,7 @@ public void test_join_whenIOExceptionThrownWhileStreamingTaskLogs_isIgnored() th replayAll(); - TaskStatus taskStatus = peonLifecycle.join(0L); + TaskStatus taskStatus = peonLifecycle.join(0L, null); verifyAll(); @@ -590,7 +591,7 @@ public void test_join_whenRuntimeExceptionThrownWhileWaitingForKubernetesJob_thr replayAll(); - Assert.assertThrows(RuntimeException.class, () -> peonLifecycle.join(0L)); + Assert.assertThrows(RuntimeException.class, () -> peonLifecycle.join(0L, null)); verifyAll(); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java index 67a5278c6a32..53100111bde1 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java @@ -113,7 +113,7 @@ public void test_start_withExistingJobs() throws IOException ) { @Override - protected ListenableFuture joinAsync(Task task) + protected KubernetesWorkItem joinAsync(Task task) { return tasks.computeIfAbsent( task.getId(), @@ -121,7 +121,7 @@ protected ListenableFuture joinAsync(Task task) task, Futures.immediateFuture(TaskStatus.success(task.getId())) ) - ).getResult(); + ); } }; @@ -157,10 +157,9 @@ public void test_start_whenDeserializationExceptionThrown_isIgnored() throws IOE ) { @Override - protected ListenableFuture joinAsync(Task task) + protected KubernetesWorkItem joinAsync(Task task) { - return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, null)) - .getResult(); + return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, null)); } }; @@ -282,12 +281,12 @@ public void test_join_withoutExistingTask() throws ExecutionException, Interrupt { TaskStatus taskStatus = TaskStatus.success(task.getId()); - EasyMock.expect(kubernetesPeonLifecycle.join(EasyMock.anyLong())).andReturn(taskStatus); + EasyMock.expect(kubernetesPeonLifecycle.join(EasyMock.anyLong(), EasyMock.anyObject())).andReturn(taskStatus); replayAll(); - ListenableFuture future = runner.joinAsync(task); - Assert.assertEquals(taskStatus, future.get()); + KubernetesWorkItem workItem = runner.joinAsync(task); + Assert.assertEquals(taskStatus, workItem.getResult().get()); verifyAll(); } @@ -306,13 +305,13 @@ public void test_join_withExistingTask_returnsExistingWorkItem() @Test public void test_join_whenExceptionThrown_throwsRuntimeException() { - EasyMock.expect(kubernetesPeonLifecycle.join(EasyMock.anyLong())).andThrow(new IllegalStateException()); + EasyMock.expect(kubernetesPeonLifecycle.join(EasyMock.anyLong(), EasyMock.anyObject())).andThrow(new IllegalStateException()); replayAll(); - ListenableFuture future = runner.joinAsync(task); + KubernetesWorkItem workItem = runner.joinAsync(task); - Exception e = Assert.assertThrows(ExecutionException.class, future::get); + Exception e = Assert.assertThrows(ExecutionException.class, () -> workItem.getResult().get()); Assert.assertTrue(e.getCause() instanceof RuntimeException); verifyAll(); From 4f044feff0e328ed59e235502a026ffedb5107ca Mon Sep 17 00:00:00 2001 From: George Wu Date: Mon, 4 Nov 2024 14:14:12 -0800 Subject: [PATCH 2/5] handle exceptions --- .../druid/k8s/overlord/KubernetesPeonLifecycle.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java index 08eaa20e5f07..7238c24227bb 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java @@ -191,17 +191,19 @@ protected synchronized TaskStatus join(long timeout, SettableFuture tas return getTaskStatus(jobResponse.getJobDuration()); } + catch (Exception e) { + if (taskActiveStatusFuture != null) { + taskActiveStatusFuture.set(false); + } + throw e; + } finally { try { - if (taskActiveStatusFuture != null) { - taskActiveStatusFuture.set(false); - } saveLogs(); } catch (Exception e) { log.warn(e, "Log processing failed for task [%s]", taskId); } - stopTask(); } } From 33c0faba3d80d0602f2ab0b0dcb288bc09bbc056 Mon Sep 17 00:00:00 2001 From: George Wu Date: Wed, 6 Nov 2024 12:28:04 -0800 Subject: [PATCH 3/5] Fix logging messages --- .../apache/druid/k8s/overlord/KubernetesPeonLifecycle.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java index 7238c24227bb..47aaab8769c2 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java @@ -271,7 +271,7 @@ protected TaskLocation getTaskLocation() /* Arguably we should throw a exception here but leaving it as a warn log to prevent unexpected errors. If there is strange behavior during overlord restarts the operator should look for this warn log. */ - log.warn("Could not get task location from k8s."); + log.warn("Could not get task location from k8s for task [%s].", taskId); return TaskLocation.unknown(); } @@ -279,7 +279,7 @@ protected TaskLocation getTaskLocation() PodStatus podStatus = pod.getStatus(); if (podStatus == null || podStatus.getPodIP() == null) { - log.warn("Could not get task location from k8s."); + log.warn("Could not get task location from k8s for task [%s].", taskId); return TaskLocation.unknown(); } taskLocation = TaskLocation.create( From dff8873a565c1df2a8b30422efe5e297ff7c956a Mon Sep 17 00:00:00 2001 From: George Wu Date: Wed, 6 Nov 2024 16:12:53 -0800 Subject: [PATCH 4/5] Don't pass in the settable future as a arg --- .../k8s/overlord/KubernetesPeonLifecycle.java | 36 +++++-- .../k8s/overlord/KubernetesTaskRunner.java | 41 ++++---- .../k8s/overlord/KubernetesWorkItem.java | 30 ++---- .../overlord/KubernetesPeonLifecycleTest.java | 28 +++--- .../overlord/KubernetesTaskRunnerTest.java | 53 ++++++----- .../k8s/overlord/KubernetesWorkItemTest.java | 93 +++++++------------ 6 files changed, 129 insertions(+), 152 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java index 47aaab8769c2..87c51ae4dfce 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodStatus; @@ -73,11 +74,11 @@ public interface TaskStateListener protected enum State { - /** Lifecycle's state before {@link #run(Job, long, long, boolean)} or {@link #join(long, SettableFuture)} is called. */ + /** Lifecycle's state before {@link #run(Job, long, long, boolean)} or {@link #join(long)} is called. */ NOT_STARTED, /** Lifecycle's state since {@link #run(Job, long, long, boolean)} is called. */ PENDING, - /** Lifecycle's state since {@link #join(long, SettableFuture)} is called. */ + /** Lifecycle's state since {@link #join(long)} is called. */ RUNNING, /** Lifecycle's state since the task has completed. */ STOPPED @@ -90,6 +91,7 @@ protected enum State private final KubernetesPeonClient kubernetesClient; private final ObjectMapper mapper; private final TaskStateListener stateListener; + private final SettableFuture taskStartedSuccessfullyFuture; @MonotonicNonNull private LogWatch logWatch; @@ -110,6 +112,7 @@ protected KubernetesPeonLifecycle( this.taskLogs = taskLogs; this.mapper = mapper; this.stateListener = stateListener; + this.taskStartedSuccessfullyFuture = SettableFuture.create(); } /** @@ -138,14 +141,16 @@ protected synchronized TaskStatus run(Job job, long launchTimeout, long timeout, launchTimeout, TimeUnit.MILLISECONDS ); - - return join(timeout, null); + return join(timeout); } catch (Exception e) { log.info("Failed to run task: %s", taskId.getOriginalTaskId()); throw e; } finally { + if (!taskStartedSuccessfullyFuture.isDone()) { + taskStartedSuccessfullyFuture.set(false); + } stopTask(); } } @@ -176,13 +181,11 @@ private void writeTaskPayload(Task task) throws IOException * @return * @throws IllegalStateException */ - protected synchronized TaskStatus join(long timeout, SettableFuture taskActiveStatusFuture) throws IllegalStateException + protected synchronized TaskStatus join(long timeout) throws IllegalStateException { try { updateState(new State[]{State.NOT_STARTED, State.PENDING}, State.RUNNING); - if (taskActiveStatusFuture != null) { - taskActiveStatusFuture.set(true); - } + taskStartedSuccessfullyFuture.set(true); JobResponse jobResponse = kubernetesClient.waitForPeonJobCompletion( taskId, timeout, @@ -192,8 +195,8 @@ protected synchronized TaskStatus join(long timeout, SettableFuture tas return getTaskStatus(jobResponse.getJobDuration()); } catch (Exception e) { - if (taskActiveStatusFuture != null) { - taskActiveStatusFuture.set(false); + if (!taskStartedSuccessfullyFuture.isDone()) { + taskStartedSuccessfullyFuture.set(false); } throw e; } @@ -395,4 +398,17 @@ private void updateState(State[] acceptedStates, State targetState) ); stateListener.stateChanged(state.get(), taskId.getOriginalTaskId()); } + + /** + * Retrieves the current {@link ListenableFuture} representing whether the task started successfully + * + *

This future can be used to track whether the task started successfully, with a boolean result + * indicating success (true) or failure (false) when the task starts. + * + * @return a {@link ListenableFuture} representing whether the task started successfully. + */ + protected ListenableFuture getTaskStartedSuccessfullyFuture() + { + return taskStartedSuccessfullyFuture; + } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index ba3bc662a550..c94b2f633ed5 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -28,7 +28,6 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.SettableFuture; import io.fabric8.kubernetes.api.model.batch.v1.Job; import org.apache.druid.common.guava.FutureUtils; import org.apache.druid.indexer.RunnerTaskState; @@ -150,15 +149,28 @@ public Optional streamTaskLog(String taskid, long offset) public ListenableFuture run(Task task) { synchronized (tasks) { - return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> runTask(task)))) - .getResult(); + return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem( + task, + exec.submit(() -> runTask(task)), + peonLifecycleFactory.build( + task, + this::emitTaskStateMetrics + ) + )).getResult(); } } protected KubernetesWorkItem joinAsync(Task task) { synchronized (tasks) { - return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> joinTask(task)))); + return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem( + task, + exec.submit(() -> joinTask(task)), + peonLifecycleFactory.build( + task, + this::emitTaskStateMetrics + ) + )); } } @@ -176,12 +188,8 @@ private TaskStatus joinTask(Task task) protected TaskStatus doTask(Task task, boolean run) { try { - KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build( - task, - this::emitTaskStateMetrics - ); + KubernetesPeonLifecycle peonLifecycle; - SettableFuture taskActiveStatusFuture; synchronized (tasks) { KubernetesWorkItem workItem = tasks.get(task.getId()); @@ -189,8 +197,7 @@ protected TaskStatus doTask(Task task, boolean run) throw new ISE("Task [%s] has been shut down", task.getId()); } - workItem.setKubernetesPeonLifecycle(peonLifecycle); - taskActiveStatusFuture = workItem.getTaskActiveStatusFuture(); + peonLifecycle = workItem.getPeonLifeycle(); } TaskStatus taskStatus; @@ -203,9 +210,7 @@ protected TaskStatus doTask(Task task, boolean run) ); } else { taskStatus = peonLifecycle.join( - config.getTaskTimeout().toStandardDuration().getMillis(), - taskActiveStatusFuture - + config.getTaskTimeout().toStandardDuration().getMillis() ); } @@ -336,7 +341,7 @@ public void start() for (Job job : peonJobs) { try { KubernetesWorkItem kubernetesWorkItem = joinAsync(adapter.toTask(job)); - taskStatusActiveList.add(kubernetesWorkItem.getTaskActiveStatusFuture()); + taskStatusActiveList.add(kubernetesWorkItem.getPeonLifeycle().getTaskStartedSuccessfullyFuture()); } catch (IOException e) { log.error(e, "Error deserializing task from job [%s]", job.getMetadata().getName()); @@ -353,7 +358,7 @@ public void start() } catch (Exception e) { final long numInitialized = - tasks.values().stream().filter(item -> item.getTaskActiveStatusFuture().isDone()).count(); + tasks.values().stream().filter(item -> item.getPeonLifeycle().getTaskStartedSuccessfullyFuture().isDone()).count(); log.warn( e, "Located [%,d] out of [%,d] active tasks (timeout = %s). Locating others asynchronously.", @@ -363,8 +368,6 @@ public void start() ); } - log.info("Loaded %,d tasks from previous run", tasks.size()); - cleanupExecutor.scheduleAtFixedRate( () -> client.deleteCompletedPeonJobsOlderThan( @@ -375,7 +378,7 @@ public void start() config.getTaskCleanupInterval().toStandardDuration().getMillis(), TimeUnit.MILLISECONDS ); - log.debug("Started cleanup executor for jobs older than %s...", config.getTaskCleanupDelay()); + log.info("Started cleanup executor for jobs older than %s...", config.getTaskCleanupDelay()); } @Override diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java index 3cf10938156b..5eb55b097b4b 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java @@ -20,9 +20,7 @@ package org.apache.druid.k8s.overlord; import com.google.common.base.Optional; -import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; @@ -35,29 +33,19 @@ public class KubernetesWorkItem extends TaskRunnerWorkItem { private final Task task; - private KubernetesPeonLifecycle kubernetesPeonLifecycle = null; - private final SettableFuture taskActiveStatusFuture; + private final KubernetesPeonLifecycle kubernetesPeonLifecycle; - public KubernetesWorkItem(Task task, ListenableFuture statusFuture) + public KubernetesWorkItem(Task task, ListenableFuture statusFuture, KubernetesPeonLifecycle kubernetesPeonLifecycle) { super(task.getId(), statusFuture); this.task = task; - this.taskActiveStatusFuture = SettableFuture.create(); - } - - protected synchronized void setKubernetesPeonLifecycle(KubernetesPeonLifecycle kubernetesPeonLifecycle) - { - Preconditions.checkState(this.kubernetesPeonLifecycle == null); this.kubernetesPeonLifecycle = kubernetesPeonLifecycle; } protected synchronized void shutdown() { - - if (this.kubernetesPeonLifecycle != null) { - this.kubernetesPeonLifecycle.startWatchingLogs(); - this.kubernetesPeonLifecycle.shutdown(); - } + this.kubernetesPeonLifecycle.startWatchingLogs(); + this.kubernetesPeonLifecycle.shutdown(); } protected boolean isPending() @@ -91,18 +79,12 @@ protected RunnerTaskState getRunnerTaskState() protected Optional streamTaskLogs() { - if (kubernetesPeonLifecycle == null) { - return Optional.absent(); - } return kubernetesPeonLifecycle.streamLogs(); } @Override public TaskLocation getLocation() { - if (kubernetesPeonLifecycle == null) { - return TaskLocation.unknown(); - } return kubernetesPeonLifecycle.getTaskLocation(); } @@ -123,8 +105,8 @@ public Task getTask() return task; } - public SettableFuture getTaskActiveStatusFuture() + protected KubernetesPeonLifecycle getPeonLifeycle() { - return taskActiveStatusFuture; + return this.kubernetesPeonLifecycle; } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java index ef7e22c68db6..5c956dd905c2 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; -import com.google.common.util.concurrent.SettableFuture; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodBuilder; import io.fabric8.kubernetes.api.model.batch.v1.Job; @@ -91,7 +90,7 @@ public void test_run() throws IOException ) { @Override - protected synchronized TaskStatus join(long timeout, SettableFuture taskActiveStatusFuture) + protected synchronized TaskStatus join(long timeout) { return TaskStatus.success(ID); } @@ -136,7 +135,7 @@ public void test_run_useTaskManager() throws IOException ) { @Override - protected synchronized TaskStatus join(long timeout, SettableFuture taskActiveStatusFuture) + protected synchronized TaskStatus join(long timeout) { return TaskStatus.success(ID); } @@ -180,7 +179,7 @@ public void test_run_whenCalledMultipleTimes_raisesIllegalStateException() throw ) { @Override - protected synchronized TaskStatus join(long timeout, SettableFuture taskActiveStatusFuture) + protected synchronized TaskStatus join(long timeout) { return TaskStatus.success(ID); } @@ -229,7 +228,7 @@ public void test_run_whenExceptionRaised_setsRunnerTaskStateToNone() ) { @Override - protected synchronized TaskStatus join(long timeout, SettableFuture taskActiveStatusFuture) + protected synchronized TaskStatus join(long timeout) { throw new IllegalStateException(); } @@ -289,7 +288,7 @@ public void test_join_withoutJob_returnsFailedTaskStatus() throws IOException replayAll(); - TaskStatus taskStatus = peonLifecycle.join(0L, null); + TaskStatus taskStatus = peonLifecycle.join(0L); verifyAll(); @@ -310,6 +309,7 @@ public void test_join() throws IOException stateListener ); + Assert.assertFalse(peonLifecycle.getTaskStartedSuccessfullyFuture().isDone()); Job job = new JobBuilder() .withNewMetadata() .withName(ID) @@ -343,10 +343,10 @@ public void test_join() throws IOException replayAll(); - TaskStatus taskStatus = peonLifecycle.join(0L, null); + TaskStatus taskStatus = peonLifecycle.join(0L); verifyAll(); - + Assert.assertTrue(peonLifecycle.getTaskStartedSuccessfullyFuture().isDone()); Assert.assertEquals(SUCCESS.withDuration(58000), taskStatus); Assert.assertEquals(KubernetesPeonLifecycle.State.STOPPED, peonLifecycle.getState()); } @@ -397,12 +397,12 @@ public void test_join_whenCalledMultipleTimes_raisesIllegalStateException() thro replayAll(); - TaskStatus taskStatus = peonLifecycle.join(0L, null); + TaskStatus taskStatus = peonLifecycle.join(0L); Assert.assertThrows( "Task [id] failed to join: invalid peon lifecycle state transition [STOPPED]->[PENDING]", IllegalStateException.class, - () -> peonLifecycle.join(0L, null) + () -> peonLifecycle.join(0L) ); verifyAll(); @@ -449,7 +449,7 @@ public void test_join_withoutTaskStatus_returnsFailedTaskStatus() throws IOExcep replayAll(); - TaskStatus taskStatus = peonLifecycle.join(0L, null); + TaskStatus taskStatus = peonLifecycle.join(0L); verifyAll(); @@ -499,7 +499,7 @@ public void test_join_whenIOExceptionThrownWhileStreamingTaskStatus_returnsFaile replayAll(); - TaskStatus taskStatus = peonLifecycle.join(0L, null); + TaskStatus taskStatus = peonLifecycle.join(0L); verifyAll(); @@ -551,7 +551,7 @@ public void test_join_whenIOExceptionThrownWhileStreamingTaskLogs_isIgnored() th replayAll(); - TaskStatus taskStatus = peonLifecycle.join(0L, null); + TaskStatus taskStatus = peonLifecycle.join(0L); verifyAll(); @@ -591,7 +591,7 @@ public void test_join_whenRuntimeExceptionThrownWhileWaitingForKubernetesJob_thr replayAll(); - Assert.assertThrows(RuntimeException.class, () -> peonLifecycle.join(0L, null)); + Assert.assertThrows(RuntimeException.class, () -> peonLifecycle.join(0L)); verifyAll(); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java index 53100111bde1..7c68b9c1d923 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import io.fabric8.kubernetes.api.model.batch.v1.Job; import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder; import org.apache.commons.io.IOUtils; @@ -103,6 +104,8 @@ public void setup() @Test public void test_start_withExistingJobs() throws IOException { + SettableFuture settableFuture = SettableFuture.create(); + settableFuture.set(true); KubernetesTaskRunner runner = new KubernetesTaskRunner( taskAdapter, config, @@ -119,7 +122,8 @@ protected KubernetesWorkItem joinAsync(Task task) task.getId(), k -> new KubernetesWorkItem( task, - Futures.immediateFuture(TaskStatus.success(task.getId())) + Futures.immediateFuture(TaskStatus.success(task.getId())), + kubernetesPeonLifecycle ) ); } @@ -133,6 +137,9 @@ protected KubernetesWorkItem joinAsync(Task task) EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job)); EasyMock.expect(taskAdapter.toTask(job)).andReturn(task); + EasyMock.expect(kubernetesPeonLifecycle.getTaskStartedSuccessfullyFuture()).andReturn( + settableFuture + ); replayAll(); @@ -159,7 +166,7 @@ public void test_start_whenDeserializationExceptionThrown_isIgnored() throws IOE @Override protected KubernetesWorkItem joinAsync(Task task) { - return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, null)); + return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, null, kubernetesPeonLifecycle)); } }; @@ -192,7 +199,7 @@ public void test_streamTaskLog_withoutExistingTask_returnsEmptyOptional() @Test public void test_streamTaskLog_withExistingTask() throws IOException { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) { @Override protected Optional streamTaskLogs() @@ -240,7 +247,7 @@ public void test_run_withoutExistingTask() throws IOException, ExecutionExceptio @Test public void test_run_withExistingTask_returnsExistingWorkItem() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null); + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle); runner.tasks.put(task.getId(), workItem); ListenableFuture future = runner.run(task); @@ -281,7 +288,7 @@ public void test_join_withoutExistingTask() throws ExecutionException, Interrupt { TaskStatus taskStatus = TaskStatus.success(task.getId()); - EasyMock.expect(kubernetesPeonLifecycle.join(EasyMock.anyLong(), EasyMock.anyObject())).andReturn(taskStatus); + EasyMock.expect(kubernetesPeonLifecycle.join(EasyMock.anyLong())).andReturn(taskStatus); replayAll(); @@ -294,7 +301,7 @@ public void test_join_withoutExistingTask() throws ExecutionException, Interrupt @Test public void test_join_withExistingTask_returnsExistingWorkItem() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null); + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle); runner.tasks.put(task.getId(), workItem); ListenableFuture future = runner.run(task); @@ -305,7 +312,7 @@ public void test_join_withExistingTask_returnsExistingWorkItem() @Test public void test_join_whenExceptionThrown_throwsRuntimeException() { - EasyMock.expect(kubernetesPeonLifecycle.join(EasyMock.anyLong(), EasyMock.anyObject())).andThrow(new IllegalStateException()); + EasyMock.expect(kubernetesPeonLifecycle.join(EasyMock.anyLong())).andThrow(new IllegalStateException()); replayAll(); @@ -330,7 +337,7 @@ public void test_doTask_whenShutdownRequested_throwsRuntimeException() @Test public void test_shutdown_withExistingTask_removesTaskFromMap() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, statusFuture) { + KubernetesWorkItem workItem = new KubernetesWorkItem(task, statusFuture, kubernetesPeonLifecycle) { @Override protected synchronized void shutdown() { @@ -347,7 +354,7 @@ protected synchronized void shutdown() @Test public void test_shutdown_withExistingTask_futureIncomplete_removesTaskFromMap() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, statusFuture) { + KubernetesWorkItem workItem = new KubernetesWorkItem(task, statusFuture, kubernetesPeonLifecycle) { @Override protected synchronized void shutdown() { @@ -384,7 +391,7 @@ public void test_getTotalTaskSlotCount() @Test public void test_getKnownTasks() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null); + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle); runner.tasks.put(task.getId(), workItem); @@ -398,7 +405,7 @@ public void test_getKnownTasks() public void test_getRunningTasks() { Task pendingTask = K8sTestUtils.createTask("pending-id", 0); - KubernetesWorkItem pendingWorkItem = new KubernetesWorkItem(pendingTask, null) { + KubernetesWorkItem pendingWorkItem = new KubernetesWorkItem(pendingTask, null, kubernetesPeonLifecycle) { @Override protected RunnerTaskState getRunnerTaskState() { @@ -408,7 +415,7 @@ protected RunnerTaskState getRunnerTaskState() runner.tasks.put(pendingTask.getId(), pendingWorkItem); Task runningTask = K8sTestUtils.createTask("running-id", 0); - KubernetesWorkItem runningWorkItem = new KubernetesWorkItem(runningTask, null) { + KubernetesWorkItem runningWorkItem = new KubernetesWorkItem(runningTask, null, kubernetesPeonLifecycle) { @Override protected RunnerTaskState getRunnerTaskState() { @@ -427,7 +434,7 @@ protected RunnerTaskState getRunnerTaskState() public void test_getPendingTasks() { Task pendingTask = K8sTestUtils.createTask("pending-id", 0); - KubernetesWorkItem pendingWorkItem = new KubernetesWorkItem(pendingTask, null) { + KubernetesWorkItem pendingWorkItem = new KubernetesWorkItem(pendingTask, null, kubernetesPeonLifecycle) { @Override protected RunnerTaskState getRunnerTaskState() { @@ -437,7 +444,7 @@ protected RunnerTaskState getRunnerTaskState() runner.tasks.put(pendingTask.getId(), pendingWorkItem); Task runningTask = K8sTestUtils.createTask("running-id", 0); - KubernetesWorkItem runningWorkItem = new KubernetesWorkItem(runningTask, null) { + KubernetesWorkItem runningWorkItem = new KubernetesWorkItem(runningTask, null, kubernetesPeonLifecycle) { @Override protected RunnerTaskState getRunnerTaskState() { @@ -461,7 +468,7 @@ public void test_getRunnerTaskState_withoutExistingTask_returnsNull() @Test public void test_getRunnerTaskState_withExistingTask() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) { + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) { @Override protected RunnerTaskState getRunnerTaskState() { @@ -476,7 +483,7 @@ protected RunnerTaskState getRunnerTaskState() @Test public void test_streamTaskReports_withExistingTask() throws Exception { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) { + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) { @Override public TaskLocation getLocation() { @@ -511,7 +518,7 @@ public void test_streamTaskReports_withoutExistingTask_returnsEmptyOptional() th @Test public void test_streamTaskReports_withUnknownTaskLocation_returnsEmptyOptional() throws Exception { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) { + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) { @Override public TaskLocation getLocation() { @@ -528,7 +535,7 @@ public TaskLocation getLocation() @Test public void test_streamTaskReports_whenInterruptedExceptionThrown_throwsRuntimeException() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) { + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) { @Override public TaskLocation getLocation() { @@ -592,7 +599,7 @@ public InputStream get(long timeout, TimeUnit unit) throws InterruptedException @Test public void test_streamTaskReports_whenExecutionExceptionThrown_throwsRuntimeException() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) { + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) { @Override public TaskLocation getLocation() { @@ -617,7 +624,7 @@ public TaskLocation getLocation() @Test public void test_metricsReported_whenTaskStateChange() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) { + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) { @Override public TaskLocation getLocation() { @@ -639,7 +646,7 @@ public TaskLocation getLocation() @Test public void test_getTaskLocation_withExistingTask() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) { + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) { @Override public TaskLocation getLocation() { @@ -656,7 +663,7 @@ public TaskLocation getLocation() @Test public void test_getTaskLocation_throws() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) { @Override public TaskLocation getLocation() @@ -688,7 +695,7 @@ public void test_getTotalCapacity() public void test_getUsedCapacity() { Assert.assertEquals(0, runner.getUsedCapacity()); - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null); + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle); runner.tasks.put(task.getId(), workItem); Assert.assertEquals(1, runner.getUsedCapacity()); runner.tasks.remove(task.getId()); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java index 7d17193b1714..fe2b576bccbe 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java @@ -45,36 +45,6 @@ public class KubernetesWorkItemTest extends EasyMockSupport public void setup() { task = NoopTask.create(); - workItem = new KubernetesWorkItem(task, null); - } - - @Test - public void test_setKubernetesPeonLifecycleTwice_throwsIllegalStateException() - { - workItem.setKubernetesPeonLifecycle(new KubernetesPeonLifecycle( - task, - null, - null, - null, - null - )); - - Assert.assertThrows( - IllegalStateException.class, - () -> workItem.setKubernetesPeonLifecycle(new KubernetesPeonLifecycle( - task, - null, - null, - null, - null - )) - ); - } - - @Test - public void test_shutdown_withoutKubernetesPeonLifecycle() - { - workItem.shutdown(); } @Test @@ -86,7 +56,11 @@ public void test_shutdown_withKubernetesPeonLifecycle() EasyMock.expectLastCall(); replayAll(); - workItem.setKubernetesPeonLifecycle(kubernetesPeonLifecycle); + workItem = new KubernetesWorkItem( + task, + null, + kubernetesPeonLifecycle + ); workItem.shutdown(); verifyAll(); @@ -95,7 +69,7 @@ public void test_shutdown_withKubernetesPeonLifecycle() @Test public void test_isPending_withTaskStateWaiting_returnsFalse() { - workItem = new KubernetesWorkItem(task, null) { + workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) { @Override protected RunnerTaskState getRunnerTaskState() { @@ -108,7 +82,7 @@ protected RunnerTaskState getRunnerTaskState() @Test public void test_isPending_withTaskStatePending_returnsTrue() { - workItem = new KubernetesWorkItem(task, null) { + workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) { @Override protected RunnerTaskState getRunnerTaskState() { @@ -121,7 +95,7 @@ protected RunnerTaskState getRunnerTaskState() @Test public void test_isRunning_withTaskStateWaiting_returnsFalse() { - workItem = new KubernetesWorkItem(task, null) { + workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) { @Override protected RunnerTaskState getRunnerTaskState() { @@ -134,7 +108,7 @@ protected RunnerTaskState getRunnerTaskState() @Test public void test_isRunning_withTaskStatePending_returnsTrue() { - workItem = new KubernetesWorkItem(task, null) { + workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) { @Override protected RunnerTaskState getRunnerTaskState() { @@ -144,22 +118,17 @@ protected RunnerTaskState getRunnerTaskState() Assert.assertTrue(workItem.isRunning()); } - @Test - public void test_getRunnerTaskState_withoutKubernetesPeonLifecycle_returnsPending() - { - Assert.assertEquals(RunnerTaskState.PENDING, workItem.getRunnerTaskState()); - } - @Test public void test_getRunnerTaskState_withKubernetesPeonLifecycle_returnsPending() { - workItem.setKubernetesPeonLifecycle(new KubernetesPeonLifecycle( + KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle( task, null, null, null, null - )); + ); + workItem = new KubernetesWorkItem(task, null, peonLifecycle); Assert.assertEquals(RunnerTaskState.PENDING, workItem.getRunnerTaskState()); } @@ -181,7 +150,7 @@ protected State getState() } }; - workItem.setKubernetesPeonLifecycle(peonLifecycle); + workItem = new KubernetesWorkItem(task, null, peonLifecycle); Assert.assertEquals(RunnerTaskState.PENDING, workItem.getRunnerTaskState()); } @@ -203,7 +172,7 @@ protected State getState() } }; - workItem.setKubernetesPeonLifecycle(peonLifecycle); + workItem = new KubernetesWorkItem(task, null, peonLifecycle); Assert.assertEquals(RunnerTaskState.RUNNING, workItem.getRunnerTaskState()); } @@ -225,46 +194,36 @@ protected State getState() } }; - workItem.setKubernetesPeonLifecycle(peonLifecycle); + workItem = new KubernetesWorkItem(task, null, peonLifecycle); Assert.assertEquals(RunnerTaskState.NONE, workItem.getRunnerTaskState()); } - @Test - public void test_streamTaskLogs_withoutKubernetesPeonLifecycle() - { - Assert.assertFalse(workItem.streamTaskLogs().isPresent()); - } - @Test public void test_streamTaskLogs_withKubernetesPeonLifecycle() { - workItem.setKubernetesPeonLifecycle(new KubernetesPeonLifecycle( + KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle( task, null, null, null, null - )); + ); + workItem = new KubernetesWorkItem(task, null, peonLifecycle); Assert.assertFalse(workItem.streamTaskLogs().isPresent()); } - @Test - public void test_getLocation_withoutKubernetesPeonLifecycle() - { - Assert.assertEquals(TaskLocation.unknown(), workItem.getLocation()); - } - @Test public void test_getLocation_withKubernetesPeonLifecycle() { - workItem.setKubernetesPeonLifecycle(new KubernetesPeonLifecycle( + KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle( task, null, null, null, null - )); + ); + workItem = new KubernetesWorkItem(task, null, peonLifecycle); Assert.assertEquals(TaskLocation.unknown(), workItem.getLocation()); } @@ -272,18 +231,28 @@ public void test_getLocation_withKubernetesPeonLifecycle() @Test public void test_getTaskType() { + workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle); Assert.assertEquals(task.getType(), workItem.getTaskType()); } @Test public void test_getDataSource() { + workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle); Assert.assertEquals(task.getDataSource(), workItem.getDataSource()); } @Test public void test_getTask() { + workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle); Assert.assertEquals(task, workItem.getTask()); } + + @Test + public void test_peonLifeycle() + { + workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle); + Assert.assertEquals(kubernetesPeonLifecycle, workItem.getPeonLifeycle()); + } } From 2aa92f7b17a54be912fb83da1e3b249db4211354 Mon Sep 17 00:00:00 2001 From: George Wu Date: Thu, 7 Nov 2024 09:39:02 -0800 Subject: [PATCH 5/5] add some unit tests --- .../k8s/overlord/KubernetesPeonLifecycle.java | 6 +- .../k8s/overlord/KubernetesTaskRunner.java | 15 ++++- .../overlord/KubernetesPeonLifecycleTest.java | 48 +++++++++++++++ .../overlord/KubernetesTaskRunnerTest.java | 58 +++++++++++++++++++ 4 files changed, 123 insertions(+), 4 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java index 87c51ae4dfce..3b493221cca0 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java @@ -145,12 +145,12 @@ protected synchronized TaskStatus run(Job job, long launchTimeout, long timeout, } catch (Exception e) { log.info("Failed to run task: %s", taskId.getOriginalTaskId()); - throw e; - } - finally { if (!taskStartedSuccessfullyFuture.isDone()) { taskStartedSuccessfullyFuture.set(false); } + throw e; + } + finally { stopTask(); } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index c94b2f633ed5..deb1f0b3d9c9 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -358,7 +358,20 @@ public void start() } catch (Exception e) { final long numInitialized = - tasks.values().stream().filter(item -> item.getPeonLifeycle().getTaskStartedSuccessfullyFuture().isDone()).count(); + tasks.values() + .stream() + .filter(item -> { + if (item.getPeonLifeycle().getTaskStartedSuccessfullyFuture().isDone()) { + try { + return item.getPeonLifeycle().getTaskStartedSuccessfullyFuture().get(); + } + catch (InterruptedException | ExecutionException ex) { + return false; + } + } else { + return false; + } + }).count(); log.warn( e, "Located [%,d] out of [%,d] active tasks (timeout = %s). Locating others asynchronously.", diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java index 5c956dd905c2..e12e6c7ab72b 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java @@ -50,6 +50,7 @@ import java.io.IOException; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -260,6 +261,53 @@ protected synchronized TaskStatus join(long timeout) Assert.assertEquals(KubernetesPeonLifecycle.State.STOPPED, peonLifecycle.getState()); } + @Test + public void test_run_whenExceptionRaised_setsStartStatusFutureToFalse() throws ExecutionException, InterruptedException + { + KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle( + task, + kubernetesClient, + taskLogs, + mapper, + stateListener + ) + { + @Override + protected synchronized TaskStatus join(long timeout) + { + throw new IllegalStateException(); + } + }; + + Job job = new JobBuilder().withNewMetadata().withName(ID).endMetadata().build(); + + EasyMock.expect(kubernetesClient.launchPeonJobAndWaitForStart( + EasyMock.eq(job), + EasyMock.eq(task), + EasyMock.anyLong(), + EasyMock.eq(TimeUnit.MILLISECONDS) + )).andReturn(null); + Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); + stateListener.stateChanged(KubernetesPeonLifecycle.State.PENDING, ID); + EasyMock.expectLastCall().once(); + stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID); + EasyMock.expectLastCall().once(); + + replayAll(); + + Assert.assertThrows( + Exception.class, + () -> peonLifecycle.run(job, 0L, 0L, false) + ); + + verifyAll(); + + Assert.assertEquals(KubernetesPeonLifecycle.State.STOPPED, peonLifecycle.getState()); + Assert.assertTrue(peonLifecycle.getTaskStartedSuccessfullyFuture().isDone()); + Assert.assertFalse(peonLifecycle.getTaskStartedSuccessfullyFuture().get()); + + } + @Test public void test_join_withoutJob_returnsFailedTaskStatus() throws IOException { diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java index 7c68b9c1d923..a6c01ee306a5 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java @@ -151,6 +151,64 @@ protected KubernetesWorkItem joinAsync(Task task) Assert.assertEquals(1, runner.tasks.size()); } + @Test + public void test_start_withExistingJobs_oneJobFails() throws IOException + { + SettableFuture settableFuture = SettableFuture.create(); + settableFuture.set(true); + KubernetesTaskRunner runner = new KubernetesTaskRunner( + taskAdapter, + config, + peonClient, + httpClient, + new TestPeonLifecycleFactory(kubernetesPeonLifecycle), + emitter + ) + { + @Override + protected KubernetesWorkItem joinAsync(Task task) + { + return tasks.computeIfAbsent( + task.getId(), + k -> new KubernetesWorkItem( + task, + Futures.immediateFuture(TaskStatus.success(task.getId())), + kubernetesPeonLifecycle + ) + ); + } + }; + + Job job = new JobBuilder() + .withNewMetadata() + .withName(ID) + .endMetadata() + .build(); + + Job job2 = new JobBuilder() + .withNewMetadata() + .withName("id2") + .endMetadata() + .build(); + + EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job, job2)); + EasyMock.expect(taskAdapter.toTask(job)).andReturn(task); + EasyMock.expect(taskAdapter.toTask(job2)).andThrow(new IOException("deserialization exception")); + + EasyMock.expect(kubernetesPeonLifecycle.getTaskStartedSuccessfullyFuture()).andReturn( + settableFuture + ); + + replayAll(); + + runner.start(); + + verifyAll(); + + Assert.assertNotNull(runner.tasks); + Assert.assertEquals(1, runner.tasks.size()); + } + @Test public void test_start_whenDeserializationExceptionThrown_isIgnored() throws IOException {