diff --git a/docs/development/extensions-contrib/k8s-jobs.md b/docs/development/extensions-contrib/k8s-jobs.md index 913e40b93733..11663642bdd1 100644 --- a/docs/development/extensions-contrib/k8s-jobs.md +++ b/docs/development/extensions-contrib/k8s-jobs.md @@ -610,6 +610,7 @@ Druid selects the pod template `podSpecWithHighMemRequests.yaml`. |`druid.indexer.runner.maxTaskDuration`| `Duration` | Max time a task is allowed to run for before getting killed |`PT4H`|No| |`druid.indexer.runner.taskCleanupDelay`| `Duration` | How long do jobs stay around before getting reaped from K8s |`P2D`|No| |`druid.indexer.runner.taskCleanupInterval`| `Duration` | How often to check for jobs to be reaped |`PT10M`|No| +|`druid.indexer.runner.taskJoinTimeout`| `Duration` | Timeout for gathering metadata about existing tasks on startup |`PT1M`|No| |`druid.indexer.runner.K8sjobLaunchTimeout`| `Duration` | How long to wait to launch a K8s task before marking it as failed, on a resource constrained cluster it may take some time. |`PT1H`|No| |`druid.indexer.runner.javaOptsArray`| `JsonArray` | java opts for the task. |`-Xmx1g`|No| |`druid.indexer.runner.labels`| `JsonObject` | Additional labels you want to add to peon pod |`{}`|No| diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java index eaef0cba6a15..3b493221cca0 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java @@ -22,6 +22,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodStatus; import io.fabric8.kubernetes.api.model.batch.v1.Job; @@ -89,6 +91,8 @@ protected enum State private final KubernetesPeonClient kubernetesClient; private final ObjectMapper mapper; private final TaskStateListener stateListener; + private final SettableFuture taskStartedSuccessfullyFuture; + @MonotonicNonNull private LogWatch logWatch; @@ -108,6 +112,7 @@ protected KubernetesPeonLifecycle( this.taskLogs = taskLogs; this.mapper = mapper; this.stateListener = stateListener; + this.taskStartedSuccessfullyFuture = SettableFuture.create(); } /** @@ -136,11 +141,13 @@ protected synchronized TaskStatus run(Job job, long launchTimeout, long timeout, launchTimeout, TimeUnit.MILLISECONDS ); - return join(timeout); } catch (Exception e) { log.info("Failed to run task: %s", taskId.getOriginalTaskId()); + if (!taskStartedSuccessfullyFuture.isDone()) { + taskStartedSuccessfullyFuture.set(false); + } throw e; } finally { @@ -178,7 +185,7 @@ protected synchronized TaskStatus join(long timeout) throws IllegalStateExceptio { try { updateState(new State[]{State.NOT_STARTED, State.PENDING}, State.RUNNING); - + taskStartedSuccessfullyFuture.set(true); JobResponse jobResponse = kubernetesClient.waitForPeonJobCompletion( taskId, timeout, @@ -187,6 +194,12 @@ protected synchronized TaskStatus join(long timeout) throws IllegalStateExceptio return getTaskStatus(jobResponse.getJobDuration()); } + catch (Exception e) { + if (!taskStartedSuccessfullyFuture.isDone()) { + taskStartedSuccessfullyFuture.set(false); + } + throw e; + } finally { try { saveLogs(); @@ -194,7 +207,6 @@ protected synchronized TaskStatus join(long timeout) throws IllegalStateExceptio catch (Exception e) { log.warn(e, "Log processing failed for task [%s]", taskId); } - stopTask(); } } @@ -245,7 +257,10 @@ protected State getState() protected TaskLocation getTaskLocation() { if (State.PENDING.equals(state.get()) || State.NOT_STARTED.equals(state.get())) { - log.debug("Can't get task location for non-running job. [%s]", taskId.getOriginalTaskId()); + /* This should not actually ever happen because KubernetesTaskRunner.start() should not return until all running tasks + have already gone into State.RUNNING, so getTaskLocation should not be called. + */ + log.warn("Can't get task location for non-running job. [%s]", taskId.getOriginalTaskId()); return TaskLocation.unknown(); } @@ -256,6 +271,10 @@ protected TaskLocation getTaskLocation() if (taskLocation == null) { Optional maybePod = kubernetesClient.getPeonPod(taskId.getK8sJobName()); if (!maybePod.isPresent()) { + /* Arguably we should throw a exception here but leaving it as a warn log to prevent unexpected errors. + If there is strange behavior during overlord restarts the operator should look for this warn log. + */ + log.warn("Could not get task location from k8s for task [%s].", taskId); return TaskLocation.unknown(); } @@ -263,6 +282,7 @@ protected TaskLocation getTaskLocation() PodStatus podStatus = pod.getStatus(); if (podStatus == null || podStatus.getPodIP() == null) { + log.warn("Could not get task location from k8s for task [%s].", taskId); return TaskLocation.unknown(); } taskLocation = TaskLocation.create( @@ -378,4 +398,17 @@ private void updateState(State[] acceptedStates, State targetState) ); stateListener.stateChanged(state.get(), taskId.getOriginalTaskId()); } + + /** + * Retrieves the current {@link ListenableFuture} representing whether the task started successfully + * + *

This future can be used to track whether the task started successfully, with a boolean result + * indicating success (true) or failure (false) when the task starts. + * + * @return a {@link ListenableFuture} representing whether the task started successfully. + */ + protected ListenableFuture getTaskStartedSuccessfullyFuture() + { + return taskStartedSuccessfullyFuture; + } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index c324b49e13a2..deb1f0b3d9c9 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -29,6 +29,7 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import io.fabric8.kubernetes.api.model.batch.v1.Job; +import org.apache.druid.common.guava.FutureUtils; import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; @@ -55,12 +56,14 @@ import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter; import org.apache.druid.tasklogs.TaskLogStreamer; import org.jboss.netty.handler.codec.http.HttpMethod; +import org.joda.time.DateTime; import org.joda.time.Duration; import javax.annotation.Nullable; import java.io.IOException; import java.io.InputStream; import java.net.URL; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -146,16 +149,28 @@ public Optional streamTaskLog(String taskid, long offset) public ListenableFuture run(Task task) { synchronized (tasks) { - return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> runTask(task)))) - .getResult(); + return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem( + task, + exec.submit(() -> runTask(task)), + peonLifecycleFactory.build( + task, + this::emitTaskStateMetrics + ) + )).getResult(); } } - protected ListenableFuture joinAsync(Task task) + protected KubernetesWorkItem joinAsync(Task task) { synchronized (tasks) { - return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> joinTask(task)))) - .getResult(); + return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem( + task, + exec.submit(() -> joinTask(task)), + peonLifecycleFactory.build( + task, + this::emitTaskStateMetrics + ) + )); } } @@ -173,10 +188,7 @@ private TaskStatus joinTask(Task task) protected TaskStatus doTask(Task task, boolean run) { try { - KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build( - task, - this::emitTaskStateMetrics - ); + KubernetesPeonLifecycle peonLifecycle; synchronized (tasks) { KubernetesWorkItem workItem = tasks.get(task.getId()); @@ -185,7 +197,7 @@ protected TaskStatus doTask(Task task, boolean run) throw new ISE("Task [%s] has been shut down", task.getId()); } - workItem.setKubernetesPeonLifecycle(peonLifecycle); + peonLifecycle = workItem.getPeonLifeycle(); } TaskStatus taskStatus; @@ -321,16 +333,53 @@ public List>> restore() public void start() { log.info("Starting K8sTaskRunner..."); - // Load tasks from previously running jobs and wait for their statuses to be updated asynchronously. - for (Job job : client.getPeonJobs()) { + // Load tasks from previously running jobs and wait for their statuses to start running. + final List> taskStatusActiveList = new ArrayList<>(); + final List peonJobs = client.getPeonJobs(); + + log.info("Locating [%,d] active tasks.", peonJobs.size()); + for (Job job : peonJobs) { try { - joinAsync(adapter.toTask(job)); + KubernetesWorkItem kubernetesWorkItem = joinAsync(adapter.toTask(job)); + taskStatusActiveList.add(kubernetesWorkItem.getPeonLifeycle().getTaskStartedSuccessfullyFuture()); } catch (IOException e) { log.error(e, "Error deserializing task from job [%s]", job.getMetadata().getName()); } } - log.info("Loaded %,d tasks from previous run", tasks.size()); + + try { + final DateTime nowUtc = DateTimes.nowUtc(); + final long timeoutMs = nowUtc.plus(config.getTaskJoinTimeout()).getMillis() - nowUtc.getMillis(); + if (timeoutMs > 0) { + FutureUtils.coalesce(taskStatusActiveList).get(timeoutMs, TimeUnit.MILLISECONDS); + } + log.info("Located [%,d] active tasks.", taskStatusActiveList.size()); + } + catch (Exception e) { + final long numInitialized = + tasks.values() + .stream() + .filter(item -> { + if (item.getPeonLifeycle().getTaskStartedSuccessfullyFuture().isDone()) { + try { + return item.getPeonLifeycle().getTaskStartedSuccessfullyFuture().get(); + } + catch (InterruptedException | ExecutionException ex) { + return false; + } + } else { + return false; + } + }).count(); + log.warn( + e, + "Located [%,d] out of [%,d] active tasks (timeout = %s). Locating others asynchronously.", + numInitialized, + taskStatusActiveList.size(), + config.getTaskJoinTimeout() + ); + } cleanupExecutor.scheduleAtFixedRate( () -> @@ -342,7 +391,7 @@ public void start() config.getTaskCleanupInterval().toStandardDuration().getMillis(), TimeUnit.MILLISECONDS ); - log.debug("Started cleanup executor for jobs older than %s...", config.getTaskCleanupDelay()); + log.info("Started cleanup executor for jobs older than %s...", config.getTaskCleanupDelay()); } @Override diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java index 60efa3c48569..106378f57aa6 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java @@ -85,6 +85,12 @@ public class KubernetesTaskRunnerConfig // interval for k8s job cleanup to run private Period taskCleanupInterval = new Period("PT10m"); + @JsonProperty + @NotNull + // how long to wait to join peon k8s jobs on startup + private Period taskJoinTimeout = new Period("PT1M"); + + @JsonProperty @NotNull // how long to wait for the peon k8s job to launch @@ -140,7 +146,8 @@ private KubernetesTaskRunnerConfig( int cpuCoreInMicro, Map labels, Map annotations, - Integer capacity + Integer capacity, + Period taskJoinTimeout ) { this.namespace = namespace; @@ -181,6 +188,10 @@ private KubernetesTaskRunnerConfig( k8sjobLaunchTimeout, this.k8sjobLaunchTimeout ); + this.taskJoinTimeout = ObjectUtils.defaultIfNull( + taskJoinTimeout, + this.taskJoinTimeout + ); this.peonMonitors = ObjectUtils.defaultIfNull( peonMonitors, this.peonMonitors @@ -247,6 +258,11 @@ public Period getTaskTimeout() { return maxTaskDuration; } + public Period getTaskJoinTimeout() + { + return taskJoinTimeout; + } + public Period getTaskCleanupDelay() { @@ -317,6 +333,7 @@ public static class Builder private Map labels; private Map annotations; private Integer capacity; + private Period taskJoinTimeout; public Builder() { @@ -425,6 +442,12 @@ public Builder withCapacity(@Min(0) @Max(Integer.MAX_VALUE) Integer capacity) return this; } + public Builder withTaskJoinTimeout(Period taskJoinTimeout) + { + this.taskJoinTimeout = taskJoinTimeout; + return this; + } + public KubernetesTaskRunnerConfig build() { return new KubernetesTaskRunnerConfig( @@ -444,7 +467,8 @@ public KubernetesTaskRunnerConfig build() this.cpuCoreInMicro, this.labels, this.annotations, - this.capacity + this.capacity, + this.taskJoinTimeout ); } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java index 94d4bbb67f63..5eb55b097b4b 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java @@ -20,7 +20,6 @@ package org.apache.druid.k8s.overlord; import com.google.common.base.Optional; -import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskLocation; @@ -34,27 +33,19 @@ public class KubernetesWorkItem extends TaskRunnerWorkItem { private final Task task; - private KubernetesPeonLifecycle kubernetesPeonLifecycle = null; + private final KubernetesPeonLifecycle kubernetesPeonLifecycle; - public KubernetesWorkItem(Task task, ListenableFuture statusFuture) + public KubernetesWorkItem(Task task, ListenableFuture statusFuture, KubernetesPeonLifecycle kubernetesPeonLifecycle) { super(task.getId(), statusFuture); this.task = task; - } - - protected synchronized void setKubernetesPeonLifecycle(KubernetesPeonLifecycle kubernetesPeonLifecycle) - { - Preconditions.checkState(this.kubernetesPeonLifecycle == null); this.kubernetesPeonLifecycle = kubernetesPeonLifecycle; } protected synchronized void shutdown() { - - if (this.kubernetesPeonLifecycle != null) { - this.kubernetesPeonLifecycle.startWatchingLogs(); - this.kubernetesPeonLifecycle.shutdown(); - } + this.kubernetesPeonLifecycle.startWatchingLogs(); + this.kubernetesPeonLifecycle.shutdown(); } protected boolean isPending() @@ -88,18 +79,12 @@ protected RunnerTaskState getRunnerTaskState() protected Optional streamTaskLogs() { - if (kubernetesPeonLifecycle == null) { - return Optional.absent(); - } return kubernetesPeonLifecycle.streamLogs(); } @Override public TaskLocation getLocation() { - if (kubernetesPeonLifecycle == null) { - return TaskLocation.unknown(); - } return kubernetesPeonLifecycle.getTaskLocation(); } @@ -119,4 +104,9 @@ public Task getTask() { return task; } + + protected KubernetesPeonLifecycle getPeonLifeycle() + { + return this.kubernetesPeonLifecycle; + } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java index 59c3700b1fc1..e12e6c7ab72b 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java @@ -50,6 +50,7 @@ import java.io.IOException; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -260,6 +261,53 @@ protected synchronized TaskStatus join(long timeout) Assert.assertEquals(KubernetesPeonLifecycle.State.STOPPED, peonLifecycle.getState()); } + @Test + public void test_run_whenExceptionRaised_setsStartStatusFutureToFalse() throws ExecutionException, InterruptedException + { + KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle( + task, + kubernetesClient, + taskLogs, + mapper, + stateListener + ) + { + @Override + protected synchronized TaskStatus join(long timeout) + { + throw new IllegalStateException(); + } + }; + + Job job = new JobBuilder().withNewMetadata().withName(ID).endMetadata().build(); + + EasyMock.expect(kubernetesClient.launchPeonJobAndWaitForStart( + EasyMock.eq(job), + EasyMock.eq(task), + EasyMock.anyLong(), + EasyMock.eq(TimeUnit.MILLISECONDS) + )).andReturn(null); + Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); + stateListener.stateChanged(KubernetesPeonLifecycle.State.PENDING, ID); + EasyMock.expectLastCall().once(); + stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID); + EasyMock.expectLastCall().once(); + + replayAll(); + + Assert.assertThrows( + Exception.class, + () -> peonLifecycle.run(job, 0L, 0L, false) + ); + + verifyAll(); + + Assert.assertEquals(KubernetesPeonLifecycle.State.STOPPED, peonLifecycle.getState()); + Assert.assertTrue(peonLifecycle.getTaskStartedSuccessfullyFuture().isDone()); + Assert.assertFalse(peonLifecycle.getTaskStartedSuccessfullyFuture().get()); + + } + @Test public void test_join_withoutJob_returnsFailedTaskStatus() throws IOException { @@ -309,6 +357,7 @@ public void test_join() throws IOException stateListener ); + Assert.assertFalse(peonLifecycle.getTaskStartedSuccessfullyFuture().isDone()); Job job = new JobBuilder() .withNewMetadata() .withName(ID) @@ -345,7 +394,7 @@ public void test_join() throws IOException TaskStatus taskStatus = peonLifecycle.join(0L); verifyAll(); - + Assert.assertTrue(peonLifecycle.getTaskStartedSuccessfullyFuture().isDone()); Assert.assertEquals(SUCCESS.withDuration(58000), taskStatus); Assert.assertEquals(KubernetesPeonLifecycle.State.STOPPED, peonLifecycle.getState()); } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java index 67a5278c6a32..a6c01ee306a5 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import io.fabric8.kubernetes.api.model.batch.v1.Job; import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder; import org.apache.commons.io.IOUtils; @@ -103,6 +104,8 @@ public void setup() @Test public void test_start_withExistingJobs() throws IOException { + SettableFuture settableFuture = SettableFuture.create(); + settableFuture.set(true); KubernetesTaskRunner runner = new KubernetesTaskRunner( taskAdapter, config, @@ -113,15 +116,16 @@ public void test_start_withExistingJobs() throws IOException ) { @Override - protected ListenableFuture joinAsync(Task task) + protected KubernetesWorkItem joinAsync(Task task) { return tasks.computeIfAbsent( task.getId(), k -> new KubernetesWorkItem( task, - Futures.immediateFuture(TaskStatus.success(task.getId())) + Futures.immediateFuture(TaskStatus.success(task.getId())), + kubernetesPeonLifecycle ) - ).getResult(); + ); } }; @@ -133,6 +137,67 @@ protected ListenableFuture joinAsync(Task task) EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job)); EasyMock.expect(taskAdapter.toTask(job)).andReturn(task); + EasyMock.expect(kubernetesPeonLifecycle.getTaskStartedSuccessfullyFuture()).andReturn( + settableFuture + ); + + replayAll(); + + runner.start(); + + verifyAll(); + + Assert.assertNotNull(runner.tasks); + Assert.assertEquals(1, runner.tasks.size()); + } + + @Test + public void test_start_withExistingJobs_oneJobFails() throws IOException + { + SettableFuture settableFuture = SettableFuture.create(); + settableFuture.set(true); + KubernetesTaskRunner runner = new KubernetesTaskRunner( + taskAdapter, + config, + peonClient, + httpClient, + new TestPeonLifecycleFactory(kubernetesPeonLifecycle), + emitter + ) + { + @Override + protected KubernetesWorkItem joinAsync(Task task) + { + return tasks.computeIfAbsent( + task.getId(), + k -> new KubernetesWorkItem( + task, + Futures.immediateFuture(TaskStatus.success(task.getId())), + kubernetesPeonLifecycle + ) + ); + } + }; + + Job job = new JobBuilder() + .withNewMetadata() + .withName(ID) + .endMetadata() + .build(); + + Job job2 = new JobBuilder() + .withNewMetadata() + .withName("id2") + .endMetadata() + .build(); + + EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job, job2)); + EasyMock.expect(taskAdapter.toTask(job)).andReturn(task); + EasyMock.expect(taskAdapter.toTask(job2)).andThrow(new IOException("deserialization exception")); + + EasyMock.expect(kubernetesPeonLifecycle.getTaskStartedSuccessfullyFuture()).andReturn( + settableFuture + ); replayAll(); @@ -157,10 +222,9 @@ public void test_start_whenDeserializationExceptionThrown_isIgnored() throws IOE ) { @Override - protected ListenableFuture joinAsync(Task task) + protected KubernetesWorkItem joinAsync(Task task) { - return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, null)) - .getResult(); + return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, null, kubernetesPeonLifecycle)); } }; @@ -193,7 +257,7 @@ public void test_streamTaskLog_withoutExistingTask_returnsEmptyOptional() @Test public void test_streamTaskLog_withExistingTask() throws IOException { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) { @Override protected Optional streamTaskLogs() @@ -241,7 +305,7 @@ public void test_run_withoutExistingTask() throws IOException, ExecutionExceptio @Test public void test_run_withExistingTask_returnsExistingWorkItem() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null); + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle); runner.tasks.put(task.getId(), workItem); ListenableFuture future = runner.run(task); @@ -286,8 +350,8 @@ public void test_join_withoutExistingTask() throws ExecutionException, Interrupt replayAll(); - ListenableFuture future = runner.joinAsync(task); - Assert.assertEquals(taskStatus, future.get()); + KubernetesWorkItem workItem = runner.joinAsync(task); + Assert.assertEquals(taskStatus, workItem.getResult().get()); verifyAll(); } @@ -295,7 +359,7 @@ public void test_join_withoutExistingTask() throws ExecutionException, Interrupt @Test public void test_join_withExistingTask_returnsExistingWorkItem() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null); + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle); runner.tasks.put(task.getId(), workItem); ListenableFuture future = runner.run(task); @@ -310,9 +374,9 @@ public void test_join_whenExceptionThrown_throwsRuntimeException() replayAll(); - ListenableFuture future = runner.joinAsync(task); + KubernetesWorkItem workItem = runner.joinAsync(task); - Exception e = Assert.assertThrows(ExecutionException.class, future::get); + Exception e = Assert.assertThrows(ExecutionException.class, () -> workItem.getResult().get()); Assert.assertTrue(e.getCause() instanceof RuntimeException); verifyAll(); @@ -331,7 +395,7 @@ public void test_doTask_whenShutdownRequested_throwsRuntimeException() @Test public void test_shutdown_withExistingTask_removesTaskFromMap() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, statusFuture) { + KubernetesWorkItem workItem = new KubernetesWorkItem(task, statusFuture, kubernetesPeonLifecycle) { @Override protected synchronized void shutdown() { @@ -348,7 +412,7 @@ protected synchronized void shutdown() @Test public void test_shutdown_withExistingTask_futureIncomplete_removesTaskFromMap() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, statusFuture) { + KubernetesWorkItem workItem = new KubernetesWorkItem(task, statusFuture, kubernetesPeonLifecycle) { @Override protected synchronized void shutdown() { @@ -385,7 +449,7 @@ public void test_getTotalTaskSlotCount() @Test public void test_getKnownTasks() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null); + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle); runner.tasks.put(task.getId(), workItem); @@ -399,7 +463,7 @@ public void test_getKnownTasks() public void test_getRunningTasks() { Task pendingTask = K8sTestUtils.createTask("pending-id", 0); - KubernetesWorkItem pendingWorkItem = new KubernetesWorkItem(pendingTask, null) { + KubernetesWorkItem pendingWorkItem = new KubernetesWorkItem(pendingTask, null, kubernetesPeonLifecycle) { @Override protected RunnerTaskState getRunnerTaskState() { @@ -409,7 +473,7 @@ protected RunnerTaskState getRunnerTaskState() runner.tasks.put(pendingTask.getId(), pendingWorkItem); Task runningTask = K8sTestUtils.createTask("running-id", 0); - KubernetesWorkItem runningWorkItem = new KubernetesWorkItem(runningTask, null) { + KubernetesWorkItem runningWorkItem = new KubernetesWorkItem(runningTask, null, kubernetesPeonLifecycle) { @Override protected RunnerTaskState getRunnerTaskState() { @@ -428,7 +492,7 @@ protected RunnerTaskState getRunnerTaskState() public void test_getPendingTasks() { Task pendingTask = K8sTestUtils.createTask("pending-id", 0); - KubernetesWorkItem pendingWorkItem = new KubernetesWorkItem(pendingTask, null) { + KubernetesWorkItem pendingWorkItem = new KubernetesWorkItem(pendingTask, null, kubernetesPeonLifecycle) { @Override protected RunnerTaskState getRunnerTaskState() { @@ -438,7 +502,7 @@ protected RunnerTaskState getRunnerTaskState() runner.tasks.put(pendingTask.getId(), pendingWorkItem); Task runningTask = K8sTestUtils.createTask("running-id", 0); - KubernetesWorkItem runningWorkItem = new KubernetesWorkItem(runningTask, null) { + KubernetesWorkItem runningWorkItem = new KubernetesWorkItem(runningTask, null, kubernetesPeonLifecycle) { @Override protected RunnerTaskState getRunnerTaskState() { @@ -462,7 +526,7 @@ public void test_getRunnerTaskState_withoutExistingTask_returnsNull() @Test public void test_getRunnerTaskState_withExistingTask() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) { + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) { @Override protected RunnerTaskState getRunnerTaskState() { @@ -477,7 +541,7 @@ protected RunnerTaskState getRunnerTaskState() @Test public void test_streamTaskReports_withExistingTask() throws Exception { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) { + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) { @Override public TaskLocation getLocation() { @@ -512,7 +576,7 @@ public void test_streamTaskReports_withoutExistingTask_returnsEmptyOptional() th @Test public void test_streamTaskReports_withUnknownTaskLocation_returnsEmptyOptional() throws Exception { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) { + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) { @Override public TaskLocation getLocation() { @@ -529,7 +593,7 @@ public TaskLocation getLocation() @Test public void test_streamTaskReports_whenInterruptedExceptionThrown_throwsRuntimeException() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) { + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) { @Override public TaskLocation getLocation() { @@ -593,7 +657,7 @@ public InputStream get(long timeout, TimeUnit unit) throws InterruptedException @Test public void test_streamTaskReports_whenExecutionExceptionThrown_throwsRuntimeException() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) { + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) { @Override public TaskLocation getLocation() { @@ -618,7 +682,7 @@ public TaskLocation getLocation() @Test public void test_metricsReported_whenTaskStateChange() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) { + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) { @Override public TaskLocation getLocation() { @@ -640,7 +704,7 @@ public TaskLocation getLocation() @Test public void test_getTaskLocation_withExistingTask() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) { + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) { @Override public TaskLocation getLocation() { @@ -657,7 +721,7 @@ public TaskLocation getLocation() @Test public void test_getTaskLocation_throws() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) { @Override public TaskLocation getLocation() @@ -689,7 +753,7 @@ public void test_getTotalCapacity() public void test_getUsedCapacity() { Assert.assertEquals(0, runner.getUsedCapacity()); - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null); + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle); runner.tasks.put(task.getId(), workItem); Assert.assertEquals(1, runner.getUsedCapacity()); runner.tasks.remove(task.getId()); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java index 7d17193b1714..fe2b576bccbe 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java @@ -45,36 +45,6 @@ public class KubernetesWorkItemTest extends EasyMockSupport public void setup() { task = NoopTask.create(); - workItem = new KubernetesWorkItem(task, null); - } - - @Test - public void test_setKubernetesPeonLifecycleTwice_throwsIllegalStateException() - { - workItem.setKubernetesPeonLifecycle(new KubernetesPeonLifecycle( - task, - null, - null, - null, - null - )); - - Assert.assertThrows( - IllegalStateException.class, - () -> workItem.setKubernetesPeonLifecycle(new KubernetesPeonLifecycle( - task, - null, - null, - null, - null - )) - ); - } - - @Test - public void test_shutdown_withoutKubernetesPeonLifecycle() - { - workItem.shutdown(); } @Test @@ -86,7 +56,11 @@ public void test_shutdown_withKubernetesPeonLifecycle() EasyMock.expectLastCall(); replayAll(); - workItem.setKubernetesPeonLifecycle(kubernetesPeonLifecycle); + workItem = new KubernetesWorkItem( + task, + null, + kubernetesPeonLifecycle + ); workItem.shutdown(); verifyAll(); @@ -95,7 +69,7 @@ public void test_shutdown_withKubernetesPeonLifecycle() @Test public void test_isPending_withTaskStateWaiting_returnsFalse() { - workItem = new KubernetesWorkItem(task, null) { + workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) { @Override protected RunnerTaskState getRunnerTaskState() { @@ -108,7 +82,7 @@ protected RunnerTaskState getRunnerTaskState() @Test public void test_isPending_withTaskStatePending_returnsTrue() { - workItem = new KubernetesWorkItem(task, null) { + workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) { @Override protected RunnerTaskState getRunnerTaskState() { @@ -121,7 +95,7 @@ protected RunnerTaskState getRunnerTaskState() @Test public void test_isRunning_withTaskStateWaiting_returnsFalse() { - workItem = new KubernetesWorkItem(task, null) { + workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) { @Override protected RunnerTaskState getRunnerTaskState() { @@ -134,7 +108,7 @@ protected RunnerTaskState getRunnerTaskState() @Test public void test_isRunning_withTaskStatePending_returnsTrue() { - workItem = new KubernetesWorkItem(task, null) { + workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) { @Override protected RunnerTaskState getRunnerTaskState() { @@ -144,22 +118,17 @@ protected RunnerTaskState getRunnerTaskState() Assert.assertTrue(workItem.isRunning()); } - @Test - public void test_getRunnerTaskState_withoutKubernetesPeonLifecycle_returnsPending() - { - Assert.assertEquals(RunnerTaskState.PENDING, workItem.getRunnerTaskState()); - } - @Test public void test_getRunnerTaskState_withKubernetesPeonLifecycle_returnsPending() { - workItem.setKubernetesPeonLifecycle(new KubernetesPeonLifecycle( + KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle( task, null, null, null, null - )); + ); + workItem = new KubernetesWorkItem(task, null, peonLifecycle); Assert.assertEquals(RunnerTaskState.PENDING, workItem.getRunnerTaskState()); } @@ -181,7 +150,7 @@ protected State getState() } }; - workItem.setKubernetesPeonLifecycle(peonLifecycle); + workItem = new KubernetesWorkItem(task, null, peonLifecycle); Assert.assertEquals(RunnerTaskState.PENDING, workItem.getRunnerTaskState()); } @@ -203,7 +172,7 @@ protected State getState() } }; - workItem.setKubernetesPeonLifecycle(peonLifecycle); + workItem = new KubernetesWorkItem(task, null, peonLifecycle); Assert.assertEquals(RunnerTaskState.RUNNING, workItem.getRunnerTaskState()); } @@ -225,46 +194,36 @@ protected State getState() } }; - workItem.setKubernetesPeonLifecycle(peonLifecycle); + workItem = new KubernetesWorkItem(task, null, peonLifecycle); Assert.assertEquals(RunnerTaskState.NONE, workItem.getRunnerTaskState()); } - @Test - public void test_streamTaskLogs_withoutKubernetesPeonLifecycle() - { - Assert.assertFalse(workItem.streamTaskLogs().isPresent()); - } - @Test public void test_streamTaskLogs_withKubernetesPeonLifecycle() { - workItem.setKubernetesPeonLifecycle(new KubernetesPeonLifecycle( + KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle( task, null, null, null, null - )); + ); + workItem = new KubernetesWorkItem(task, null, peonLifecycle); Assert.assertFalse(workItem.streamTaskLogs().isPresent()); } - @Test - public void test_getLocation_withoutKubernetesPeonLifecycle() - { - Assert.assertEquals(TaskLocation.unknown(), workItem.getLocation()); - } - @Test public void test_getLocation_withKubernetesPeonLifecycle() { - workItem.setKubernetesPeonLifecycle(new KubernetesPeonLifecycle( + KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle( task, null, null, null, null - )); + ); + workItem = new KubernetesWorkItem(task, null, peonLifecycle); Assert.assertEquals(TaskLocation.unknown(), workItem.getLocation()); } @@ -272,18 +231,28 @@ public void test_getLocation_withKubernetesPeonLifecycle() @Test public void test_getTaskType() { + workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle); Assert.assertEquals(task.getType(), workItem.getTaskType()); } @Test public void test_getDataSource() { + workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle); Assert.assertEquals(task.getDataSource(), workItem.getDataSource()); } @Test public void test_getTask() { + workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle); Assert.assertEquals(task, workItem.getTask()); } + + @Test + public void test_peonLifeycle() + { + workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle); + Assert.assertEquals(kubernetesPeonLifecycle, workItem.getPeonLifeycle()); + } }