From aa805885a13cdb2d6ac464b8cc48a189f1e776e0 Mon Sep 17 00:00:00 2001 From: George Wu Date: Fri, 31 Mar 2023 16:36:07 -0400 Subject: [PATCH 01/10] Fix issues with null pointers on jobResponse --- .../k8s/overlord/common/DruidKubernetesPeonClient.java | 3 ++- .../org/apache/druid/k8s/overlord/common/JobResponse.java | 7 ++++--- .../apache/druid/k8s/overlord/common/JobResponseTest.java | 8 ++++++++ 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java index e60b3b6f8eea..56e87a2c18fb 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java @@ -111,7 +111,8 @@ public JobResponse waitForJobCompletion(K8sTaskId taskId, long howLong, TimeUnit unit ); if (job == null) { - return new JobResponse(job, PeonPhase.FAILED); + log.info("K8s job for task was not found %s", taskId); + return new JobResponse(null, PeonPhase.FAILED); } if (job.getStatus().getSucceeded() != null) { return new JobResponse(job, PeonPhase.SUCCEEDED); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/JobResponse.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/JobResponse.java index 6f39944951fa..77ef7e3ad278 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/JobResponse.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/JobResponse.java @@ -54,6 +54,7 @@ public PeonPhase getPhase() public Optional getJobDuration() { Optional duration = Optional.absent(); + String jobName = job != null && job.getMetadata() != null ? job.getMetadata().getName() : ""; try { if (job != null && job.getStatus() != null && job.getStatus().getStartTime() != null @@ -66,12 +67,12 @@ public Optional getJobDuration() } } catch (Exception e) { - LOGGER.error(e, "Error calculating duration for job: %s", job.getMetadata().getName()); + LOGGER.error(e, "Error calculating duration for job: %s", jobName); } if (duration.isPresent()) { - LOGGER.info("Duration for Job: %s was %d seconds", job.getMetadata().getName(), duration.get()); + LOGGER.info("Duration for Job: %s was %d seconds", jobName, duration.get()); } else { - LOGGER.info("Unable to calcuate duration for Job: %s", job.getMetadata().getName()); + LOGGER.info("Unable to calcuate duration for Job: %s", jobName); } return duration; } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/JobResponseTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/JobResponseTest.java index 76a73c4a0b3a..5f6328ce5486 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/JobResponseTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/JobResponseTest.java @@ -75,4 +75,12 @@ void testMakingCodeCoverageHappy() Optional duration = response.getJobDuration(); Assertions.assertFalse(duration.isPresent()); } + + @Test + void testNullJob() + { + JobResponse response = new JobResponse(null, PeonPhase.SUCCEEDED); + Optional duration = response.getJobDuration(); + Assertions.assertFalse(duration.isPresent()); + } } From 437d998349e117bd0a1ad2d1116e0c04e56b961f Mon Sep 17 00:00:00 2001 From: George Wu Date: Mon, 3 Apr 2023 18:23:01 -0400 Subject: [PATCH 02/10] fix unit tests --- .../k8s/overlord/KubernetesTaskRunner.java | 5 ++ .../overlord/KubernetesTaskRunnerTest.java | 58 ++++++++++++++++++- 2 files changed, 62 insertions(+), 1 deletion(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index 5b76329cdde3..7d8b539dc9d0 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -180,6 +180,11 @@ public ListenableFuture run(Task task) TaskStatus status; if (PeonPhase.SUCCEEDED.equals(completedPhase.getPhase())) { status = TaskStatus.success(task.getId()); + } else if (completedPhase.getJob() == null) { + status = TaskStatus.failure( + task.getId(), + "Task was deleted before completion: " + k8sTaskId + ); } else { status = TaskStatus.failure( task.getId(), diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java index 64a7f2fc0a89..b7fecb9d8f36 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java @@ -38,6 +38,7 @@ import org.apache.druid.guice.FirehoseModule; import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskLocation; +import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.task.IndexTask; @@ -694,6 +695,62 @@ public void testWorkItemGetLocation_withKubernetesResourceNotFoundException_retu assertEquals(TaskLocation.unknown(), location); } + @Test + public void testK8sJobManualShutdown() throws Exception + { + Task task = makeTask(); + K8sTaskId k8sTaskId = new K8sTaskId(task.getId()); + + Job job = mock(Job.class); + ObjectMeta jobMetadata = mock(ObjectMeta.class); + when(jobMetadata.getName()).thenReturn(k8sTaskId.getK8sTaskId()); + JobStatus status = mock(JobStatus.class); + when(status.getActive()).thenReturn(1); + when(job.getStatus()).thenReturn(status); + when(job.getMetadata()).thenReturn(jobMetadata); + + Pod peonPod = mock(Pod.class); + ObjectMeta metadata = mock(ObjectMeta.class); + when(metadata.getName()).thenReturn("peonPodName"); + when(metadata.getCreationTimestamp()).thenReturn(DateTimes.nowUtc().toString()); + when(peonPod.getMetadata()).thenReturn(metadata); + PodStatus podStatus = mock(PodStatus.class); + when(podStatus.getPodIP()).thenReturn("SomeIP"); + when(peonPod.getStatus()).thenReturn(podStatus); + + K8sTaskAdapter adapter = mock(K8sTaskAdapter.class); + when(adapter.fromTask(eq(task))).thenReturn(job); + when(adapter.toTask(eq(peonPod))).thenReturn(task); + + DruidKubernetesPeonClient peonClient = mock(DruidKubernetesPeonClient.class); + + when(peonClient.jobExists(eq(k8sTaskId))).thenReturn(Optional.of(job)); + when(peonClient.getMainJobPod(eq(k8sTaskId))).thenReturn(peonPod); + + // Client returns a null job if the job has been deleted + when(peonClient.waitForJobCompletion(eq(k8sTaskId), anyLong(), isA(TimeUnit.class))).thenReturn(new JobResponse( + null, + PeonPhase.FAILED + )); + when(peonClient.getPeonLogs(eq(k8sTaskId))).thenReturn(Optional.absent()); + when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true); + + KubernetesTaskRunner taskRunner = new KubernetesTaskRunner( + adapter, + kubernetesTaskRunnerConfig, + taskQueueConfig, + taskLogPusher, + peonClient, + null + ); + KubernetesTaskRunner spyRunner = spy(taskRunner); + ListenableFuture future = spyRunner.run(task); + TaskStatus taskStatus = future.get(); + Assert.assertEquals(TaskState.FAILED, taskStatus.getStatusCode()); + Assert.assertEquals("Task was deleted before completion: [ k8sTaskId, k8staskid]", taskStatus.getErrorMsg()); + + } + private Task makeTask() { return new TestableNoopTask( @@ -711,7 +768,6 @@ private Task makeTask() ) ); } - private static class TestableNoopTask extends NoopTask { TestableNoopTask( From 51315d33da14df1d5f7f27f42d21b3b4647b241f Mon Sep 17 00:00:00 2001 From: George Shiqi Wu Date: Mon, 3 Apr 2023 18:53:53 -0400 Subject: [PATCH 03/10] Update extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java Co-authored-by: Abhishek Agarwal <1477457+abhishekagarwal87@users.noreply.github.com> --- .../druid/k8s/overlord/common/DruidKubernetesPeonClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java index 56e87a2c18fb..eed751a96f7a 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java @@ -111,7 +111,7 @@ public JobResponse waitForJobCompletion(K8sTaskId taskId, long howLong, TimeUnit unit ); if (job == null) { - log.info("K8s job for task was not found %s", taskId); + log.info("K8s job for the task [%s] was not found. It can happen if the task was canceled", taskId); return new JobResponse(null, PeonPhase.FAILED); } if (job.getStatus().getSucceeded() != null) { From 1b249fe3ffa19775e6aad7ecbe2748c71a560fdb Mon Sep 17 00:00:00 2001 From: George Wu Date: Mon, 3 Apr 2023 18:59:27 -0400 Subject: [PATCH 04/10] nullable --- .../org/apache/druid/k8s/overlord/common/JobResponse.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/JobResponse.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/JobResponse.java index 77ef7e3ad278..9254f34225ac 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/JobResponse.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/JobResponse.java @@ -26,6 +26,8 @@ import org.joda.time.Period; import org.joda.time.PeriodType; +import javax.annotation.Nullable; + public class JobResponse { @@ -35,7 +37,7 @@ public class JobResponse private final Job job; private final PeonPhase phase; - public JobResponse(Job job, PeonPhase phase) + public JobResponse(@Nullable Job job, PeonPhase phase) { this.job = job; this.phase = phase; From 359bbee401a3e5adad658836986af91e30736993 Mon Sep 17 00:00:00 2001 From: George Wu Date: Tue, 4 Apr 2023 10:43:49 -0400 Subject: [PATCH 05/10] fix error message --- .../org/apache/druid/k8s/overlord/KubernetesTaskRunner.java | 2 +- .../org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index 7d8b539dc9d0..3341d3f0018a 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -183,7 +183,7 @@ public ListenableFuture run(Task task) } else if (completedPhase.getJob() == null) { status = TaskStatus.failure( task.getId(), - "Task was deleted before completion: " + k8sTaskId + "K8s Job for task disappeared before completion: " + k8sTaskId ); } else { status = TaskStatus.failure( diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java index b7fecb9d8f36..256619812e33 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java @@ -747,7 +747,7 @@ public void testK8sJobManualShutdown() throws Exception ListenableFuture future = spyRunner.run(task); TaskStatus taskStatus = future.get(); Assert.assertEquals(TaskState.FAILED, taskStatus.getStatusCode()); - Assert.assertEquals("Task was deleted before completion: [ k8sTaskId, k8staskid]", taskStatus.getErrorMsg()); + Assert.assertEquals("K8s Job for task disappeared before completion: [ k8sTaskId, k8staskid]", taskStatus.getErrorMsg()); } From 2e0a9400229b5b781a55d170615b0c6e904953b9 Mon Sep 17 00:00:00 2001 From: George Wu Date: Tue, 4 Apr 2023 16:48:19 -0400 Subject: [PATCH 06/10] Use jobs for known tasks instead of pods --- .../k8s/overlord/KubernetesTaskRunner.java | 12 +++-- .../k8s/overlord/common/K8sTaskAdapter.java | 4 +- .../common/PodTemplateTaskAdapter.java | 8 ++-- .../k8s/overlord/common/TaskAdapter.java | 3 +- .../overlord/KubernetesTaskRunnerTest.java | 10 ++-- .../DruidPeonClientIntegrationTest.java | 2 +- .../overlord/common/K8sTaskAdapterTest.java | 20 ++++---- .../common/PodTemplateTaskAdapterTest.java | 35 ++++++++------ .../src/test/resources/basePod.yaml | 46 ++++++++++--------- .../resources/basePodWithoutAnnotations.yaml | 41 +++++++++-------- 10 files changed, 98 insertions(+), 83 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index 3341d3f0018a..b5077543b89f 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -24,7 +24,6 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -80,6 +79,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** * Runs tasks as k8s jobs using the "internal peon" verb. @@ -338,7 +338,9 @@ public Map getTotalTaskSlotCount() public Collection getKnownTasks() { List result = new ArrayList<>(); - for (Pod existingTask : client.listPeonPods()) { + log.info("Get known tasks"); + for (Job existingTask : client.listAllPeonJobs()) { + log.info(existingTask.getMetadata().getName()); try { Task task = adapter.toTask(existingTask); ListenableFuture future = run(task); @@ -425,7 +427,11 @@ public void registerListener(TaskRunnerListener listener, Executor executor) public Collection getRunningTasks() { List result = new ArrayList<>(); - for (Pod existingTask : client.listPeonPods(Sets.newHashSet(PeonPhase.RUNNING))) { + log.info("Get running tasks"); + for (Job existingTask : client.listAllPeonJobs().stream() + .filter(job -> job.getStatus() != null && job.getStatus().getActive() != null && job.getStatus().getActive() > 0).collect(Collectors.toSet()) + ) { + log.info(existingTask.getMetadata().getName()); try { Task task = adapter.toTask(existingTask); ListenableFuture future = run(task); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapter.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapter.java index bfc87d7c8676..b899f36e4969 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapter.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapter.java @@ -117,9 +117,9 @@ public Job fromTask(Task task) throws IOException } @Override - public Task toTask(Pod from) throws IOException + public Task toTask(Job from) throws IOException { - PodSpec podSpec = from.getSpec(); + PodSpec podSpec = from.getSpec().getTemplate().getSpec(); massageSpec(podSpec, "main"); List envVars = podSpec.getContainers().get(0).getEnv(); Optional taskJson = envVars.stream().filter(x -> "TASK_JSON".equals(x.getName())).findFirst(); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/PodTemplateTaskAdapter.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/PodTemplateTaskAdapter.java index e4aad7b3d94d..ad79ba669542 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/PodTemplateTaskAdapter.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/PodTemplateTaskAdapter.java @@ -156,15 +156,15 @@ public Job fromTask(Task task) throws IOException * @throws IOException */ @Override - public Task toTask(Pod from) throws IOException + public Task toTask(Job from) throws IOException { - Map annotations = from.getMetadata().getAnnotations(); + Map annotations = from.getSpec().getTemplate().getMetadata().getAnnotations(); if (annotations == null) { - throw new IOE("No annotations found on pod [%s]", from.getMetadata().getName()); + throw new IOE("No annotations found on pod spec for job [%s]", from.getMetadata().getName()); } String task = annotations.get(DruidK8sConstants.TASK); if (task == null) { - throw new IOE("No task annotation found on pod [%s]", from.getMetadata().getName()); + throw new IOE("No task annotation found on pod spec for job [%s]", from.getMetadata().getName()); } return mapper.readValue(Base64Compression.decompressBase64(task), Task.class); } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/TaskAdapter.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/TaskAdapter.java index 00263fdc3b9b..a58240e40aed 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/TaskAdapter.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/TaskAdapter.java @@ -19,7 +19,6 @@ package org.apache.druid.k8s.overlord.common; -import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.batch.v1.Job; import org.apache.druid.indexing.common.task.Task; @@ -30,6 +29,6 @@ public interface TaskAdapter Job fromTask(Task task) throws IOException; - Task toTask(Pod from) throws IOException; + Task toTask(Job from) throws IOException; } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java index 256619812e33..6c651f0cbdea 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java @@ -262,11 +262,11 @@ public void testTheK8sRestartState() throws Exception K8sTaskAdapter adapter = mock(K8sTaskAdapter.class); when(adapter.fromTask(eq(task))).thenReturn(job); - when(adapter.toTask(eq(peonPod))).thenReturn(task); + when(adapter.toTask(eq(job))).thenReturn(task); DruidKubernetesPeonClient peonClient = mock(DruidKubernetesPeonClient.class); - when(peonClient.listPeonPods()).thenReturn(Collections.singletonList(peonPod)); + when(peonClient.listAllPeonJobs()).thenReturn(Collections.singletonList(job)); when(peonClient.jobExists(eq(k8sTaskId))).thenReturn(Optional.of(job)); when(peonClient.launchJobAndWaitForStart(isA(Job.class), anyLong(), isA(TimeUnit.class))).thenReturn(peonPod); when(peonClient.getMainJobPod(eq(k8sTaskId))).thenReturn(peonPod); @@ -325,11 +325,11 @@ public void testTheK8sRestartStateAndHandleJobsThatAlreadyCompletedWhileDown() t K8sTaskAdapter adapter = mock(K8sTaskAdapter.class); when(adapter.fromTask(eq(task))).thenReturn(job); - when(adapter.toTask(eq(peonPod))).thenReturn(task); + when(adapter.toTask(eq(job))).thenReturn(task); DruidKubernetesPeonClient peonClient = mock(DruidKubernetesPeonClient.class); - when(peonClient.listPeonPods()).thenReturn(Collections.singletonList(peonPod)); + when(peonClient.listAllPeonJobs()).thenReturn(Collections.singletonList(job)); when(peonClient.jobExists(eq(k8sTaskId))).thenReturn(Optional.of(job)); when(peonClient.launchJobAndWaitForStart(isA(Job.class), anyLong(), isA(TimeUnit.class))).thenReturn(peonPod); when(peonClient.getMainJobPod(eq(k8sTaskId))).thenReturn(peonPod); @@ -720,7 +720,7 @@ public void testK8sJobManualShutdown() throws Exception K8sTaskAdapter adapter = mock(K8sTaskAdapter.class); when(adapter.fromTask(eq(task))).thenReturn(job); - when(adapter.toTask(eq(peonPod))).thenReturn(task); + when(adapter.toTask(eq(job))).thenReturn(task); DruidKubernetesPeonClient peonClient = mock(DruidKubernetesPeonClient.class); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidPeonClientIntegrationTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidPeonClientIntegrationTest.java index 72cd8d41ec66..733e91b50d3a 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidPeonClientIntegrationTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidPeonClientIntegrationTest.java @@ -160,7 +160,7 @@ public void testDeployingSomethingToKind(@TempDir Path tempDir) throws Exception thread.start(); // assert that the env variable is corret - Task taskFromEnvVar = adapter.toTask(peonClient.getMainJobPod(new K8sTaskId(task.getId()))); + Task taskFromEnvVar = adapter.toTask(job); assertEquals(task, taskFromEnvVar); // now copy the task.json file from the pod and make sure its the same as our task.json we expected diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapterTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapterTest.java index fe9775868edd..e37c549d83da 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapterTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapterTest.java @@ -27,11 +27,10 @@ import com.google.common.collect.Lists; import io.fabric8.kubernetes.api.model.Container; import io.fabric8.kubernetes.api.model.ContainerBuilder; -import io.fabric8.kubernetes.api.model.Pod; -import io.fabric8.kubernetes.api.model.PodList; import io.fabric8.kubernetes.api.model.PodSpec; import io.fabric8.kubernetes.api.model.Quantity; import io.fabric8.kubernetes.api.model.batch.v1.Job; +import io.fabric8.kubernetes.api.model.batch.v1.JobList; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; import org.apache.commons.lang.StringUtils; @@ -160,22 +159,19 @@ public void serializingAndDeserializingATask() throws IOException task, new PeonCommandContext(new ArrayList<>(), new ArrayList<>(), new File("/tmp/")) ); - - // cant launch jobs with test server, we have to hack around this. - Pod pod = K8sTestUtils.createPodFromJob(jobFromSpec); - client.pods().inNamespace("test").create(pod); - PodList podList = client.pods().inNamespace("test").list(); - assertEquals(1, podList.getItems().size()); + client.batch().v1().jobs().inNamespace("test").create(jobFromSpec); + JobList jobList = client.batch().v1().jobs().inNamespace("test").list(); + assertEquals(1, jobList.getItems().size()); // assert that the size of the pod is 1g - Pod myPod = Iterables.getOnlyElement(podList.getItems()); - Quantity containerMemory = myPod.getSpec().getContainers().get(0).getResources().getLimits().get("memory"); + Job myJob = Iterables.getOnlyElement(jobList.getItems()); + Quantity containerMemory = myJob.getSpec().getTemplate().getSpec().getContainers().get(0).getResources().getLimits().get("memory"); String amount = containerMemory.getAmount(); assertEquals(2400000000L, Long.valueOf(amount)); assertTrue(StringUtils.isBlank(containerMemory.getFormat())); // no units specified we talk in bytes - Task taskFromPod = adapter.toTask(Iterables.getOnlyElement(podList.getItems())); - assertEquals(task, taskFromPod); + Task taskFromJob = adapter.toTask(Iterables.getOnlyElement(jobList.getItems())); + assertEquals(task, taskFromJob); } @Test diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/PodTemplateTaskAdapterTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/PodTemplateTaskAdapterTest.java index 7c9dad46072b..0a1f845697ef 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/PodTemplateTaskAdapterTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/PodTemplateTaskAdapterTest.java @@ -21,10 +21,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; -import io.fabric8.kubernetes.api.model.Pod; -import io.fabric8.kubernetes.api.model.PodBuilder; import io.fabric8.kubernetes.api.model.PodTemplate; import io.fabric8.kubernetes.api.model.batch.v1.Job; +import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; import org.apache.druid.indexing.common.TestUtils; @@ -315,15 +314,17 @@ public void test_fromTask_withoutAnnotations_throwsIOE() throws IOException props ); - Pod pod = client - .pods() + Job job = client + .batch() + .v1() + .jobs() .load(this.getClass() .getClassLoader() .getResourceAsStream("basePodWithoutAnnotations.yaml") ) .get(); - Assert.assertThrows(IOE.class, () -> adapter.toTask(pod)); + Assert.assertThrows(IOE.class, () -> adapter.toTask(job)); } @Test @@ -344,21 +345,25 @@ public void test_fromTask_withoutTaskAnnotation_throwsIOE() throws IOException props ); - Pod basePod = client - .pods() + Job baseJob = client + .batch() + .v1() + .jobs() .load(this.getClass() .getClassLoader() .getResourceAsStream("basePodWithoutAnnotations.yaml") ) .get(); - - Pod pod = new PodBuilder(basePod) + Job job = new JobBuilder(baseJob) + .editSpec() + .editTemplate() .editMetadata() .addToAnnotations(Collections.emptyMap()) .endMetadata() + .endTemplate() + .endSpec() .build(); - - Assert.assertThrows(IOE.class, () -> adapter.toTask(pod)); + Assert.assertThrows(IOE.class, () -> adapter.toTask(job)); } @Test @@ -379,15 +384,17 @@ public void test_fromTask() throws IOException props ); - Pod pod = client - .pods() + Job job = client + .batch() + .v1() + .jobs() .load(this.getClass() .getClassLoader() .getResourceAsStream("basePod.yaml") ) .get(); - Task actual = adapter.toTask(pod); + Task actual = adapter.toTask(job); Task expected = NoopTask.create("id", 1); Assertions.assertEquals(expected, actual); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/basePod.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/basePod.yaml index 5c8c4f7855ea..eb5f46cb3e2a 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/basePod.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/basePod.yaml @@ -1,23 +1,27 @@ -apiVersion: v1 -kind: Pod +apiVersion: batch/v1 +kind: Job metadata: - name: "id-kmwkw" - labels: - job-name: "id" - druid.k8s.peons: "true" - annotations: - task: "H4sIAAAAAAAAAEVOOw7CMAy9i+cOBYmlK0KItWVhNI0BSyEOToKoqt4doxZYLPv9/EbIQyRoIIhEqICd7TYquKqUePidDjN2UrSfxYEM0xKOfDdgvalr86aW0A0z9L9bSsVnc512nZkurHSTZJJQvK+gl5DpZfwIUVmU8wDNarJ0Ssu/EfCJ7PHM3tj9p9i3ltKjWKDbYsR+sU5vP86oMNUAAAA=" + name: "id" spec: - containers: - - command: - - sleep - - "3600" - env: - - name: "TASK_DIR" - value: "/tmp/id" - - name: "TASK_JSON" - valueFrom: - fieldRef: - fieldPath: "metadata.annotations['task']" - image: one - name: primary \ No newline at end of file + template: + metadata: + labels: + job-name: "id" + druid.k8s.peons: "true" + annotations: + task: "H4sIAAAAAAAAAEVOOw7CMAy9i+cOBYmlK0KItWVhNI0BSyEOToKoqt4doxZYLPv9/EbIQyRoIIhEqICd7TYquKqUePidDjN2UrSfxYEM0xKOfDdgvalr86aW0A0z9L9bSsVnc512nZkurHSTZJJQvK+gl5DpZfwIUVmU8wDNarJ0Ssu/EfCJ7PHM3tj9p9i3ltKjWKDbYsR+sU5vP86oMNUAAAA=" + name: id-kmwkw + spec: + containers: + - command: + - sleep + - "3600" + env: + - name: "TASK_DIR" + value: "/tmp/id" + - name: "TASK_JSON" + valueFrom: + fieldRef: + fieldPath: "metadata.annotations['task']" + image: one + name: primary diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/basePodWithoutAnnotations.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/basePodWithoutAnnotations.yaml index c56ba7c9b6a3..9b1ad233fce3 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/basePodWithoutAnnotations.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/basePodWithoutAnnotations.yaml @@ -1,21 +1,24 @@ -apiVersion: v1 -kind: Pod +apiVersion: batch/v1 +kind: Job metadata: - name: "id-kmwkw" - labels: - job-name: "id" - annotations: + name: "id" spec: - containers: - - command: - - sleep - - "3600" - env: - - name: "TASK_DIR" - value: "/tmp/id" - - name: "TASK_JSON" - valueFrom: - fieldRef: - fieldPath: "metadata.annotations['task']" - image: one - name: primary \ No newline at end of file + template: + metadata: + labels: + job-name: id + name: id-kmwkw + spec: + containers: + - command: + - sleep + - "3600" + env: + - name: "TASK_DIR" + value: "/tmp/id" + - name: "TASK_JSON" + valueFrom: + fieldRef: + fieldPath: "metadata.annotations['task']" + image: one + name: primary \ No newline at end of file From 9f857ce88c69ac551e4aa5512328820e537be36f Mon Sep 17 00:00:00 2001 From: George Wu Date: Tue, 4 Apr 2023 21:27:57 -0400 Subject: [PATCH 07/10] Remove log lines --- .../org/apache/druid/k8s/overlord/KubernetesTaskRunner.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index b5077543b89f..5725080674c0 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -338,7 +338,6 @@ public Map getTotalTaskSlotCount() public Collection getKnownTasks() { List result = new ArrayList<>(); - log.info("Get known tasks"); for (Job existingTask : client.listAllPeonJobs()) { log.info(existingTask.getMetadata().getName()); try { @@ -427,7 +426,6 @@ public void registerListener(TaskRunnerListener listener, Executor executor) public Collection getRunningTasks() { List result = new ArrayList<>(); - log.info("Get running tasks"); for (Job existingTask : client.listAllPeonJobs().stream() .filter(job -> job.getStatus() != null && job.getStatus().getActive() != null && job.getStatus().getActive() > 0).collect(Collectors.toSet()) ) { From c3c554cf119bc7090da97ca0a6c17ed864a3c75b Mon Sep 17 00:00:00 2001 From: George Wu Date: Tue, 4 Apr 2023 21:41:24 -0400 Subject: [PATCH 08/10] remove log lines --- .../org/apache/druid/k8s/overlord/KubernetesTaskRunner.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index 5725080674c0..7f716fee108f 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -339,7 +339,6 @@ public Collection getKnownTasks() { List result = new ArrayList<>(); for (Job existingTask : client.listAllPeonJobs()) { - log.info(existingTask.getMetadata().getName()); try { Task task = adapter.toTask(existingTask); ListenableFuture future = run(task); @@ -429,7 +428,6 @@ public Collection getRunningTasks() for (Job existingTask : client.listAllPeonJobs().stream() .filter(job -> job.getStatus() != null && job.getStatus().getActive() != null && job.getStatus().getActive() > 0).collect(Collectors.toSet()) ) { - log.info(existingTask.getMetadata().getName()); try { Task task = adapter.toTask(existingTask); ListenableFuture future = run(task); From d52fab91f68aacadb23b931e132c9166fa108dc7 Mon Sep 17 00:00:00 2001 From: George Wu Date: Wed, 5 Apr 2023 11:11:10 -0400 Subject: [PATCH 09/10] PR change requests --- .../k8s/overlord/KubernetesTaskRunner.java | 5 +- .../common/DruidKubernetesPeonClient.java | 21 +------ .../druid/k8s/overlord/common/JobStatus.java | 50 ++++++++++++++++ .../overlord/common/KubernetesPeonClient.java | 5 -- .../common/DruidKubernetesPeonClientTest.java | 20 ------- .../k8s/overlord/common/JobStatusTest.java | 58 +++++++++++++++++++ 6 files changed, 111 insertions(+), 48 deletions(-) create mode 100644 extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/JobStatus.java create mode 100644 extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/JobStatusTest.java diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index 7f716fee108f..c5c6be5c5365 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -51,6 +51,7 @@ import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler; import org.apache.druid.k8s.overlord.common.DruidK8sConstants; import org.apache.druid.k8s.overlord.common.JobResponse; +import org.apache.druid.k8s.overlord.common.JobStatus; import org.apache.druid.k8s.overlord.common.K8sTaskId; import org.apache.druid.k8s.overlord.common.KubernetesPeonClient; import org.apache.druid.k8s.overlord.common.KubernetesResourceNotFoundException; @@ -425,9 +426,7 @@ public void registerListener(TaskRunnerListener listener, Executor executor) public Collection getRunningTasks() { List result = new ArrayList<>(); - for (Job existingTask : client.listAllPeonJobs().stream() - .filter(job -> job.getStatus() != null && job.getStatus().getActive() != null && job.getStatus().getActive() > 0).collect(Collectors.toSet()) - ) { + for (Job existingTask : client.listAllPeonJobs().stream().filter(JobStatus::isActive).collect(Collectors.toSet())) { try { Task task = adapter.toTask(existingTask); ListenableFuture future = run(task); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java index 581dcf7749dc..cbfef8f3d611 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java @@ -35,10 +35,8 @@ import java.sql.Timestamp; import java.util.ArrayList; import java.util.List; -import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; public class DruidKubernetesPeonClient implements KubernetesPeonClient { @@ -106,7 +104,7 @@ public JobResponse waitForJobCompletion(K8sTaskId taskId, long howLong, TimeUnit .inNamespace(namespace) .withName(taskId.getK8sTaskId()) .waitUntilCondition( - x -> (x == null) || (x.getStatus() != null && x.getStatus().getActive() == null), + x -> !JobStatus.isActive(x), howLong, unit ); @@ -180,23 +178,6 @@ public List listAllPeonJobs() .getItems()); } - @Override - public List listPeonPods(Set phases) - { - return listPeonPods().stream() - .filter(x -> phases.contains(PeonPhase.getPhaseFor(x))) - .collect(Collectors.toList()); - } - - @Override - public List listPeonPods() - { - PodList podList = clientApi.executeRequest(client -> client.pods().inNamespace(namespace)) - .withLabel(DruidK8sConstants.LABEL_KEY) - .list(); - return podList.getItems(); - } - @Override public int cleanCompletedJobsOlderThan(long howFarBack, TimeUnit timeUnit) { diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/JobStatus.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/JobStatus.java new file mode 100644 index 000000000000..d6222246823f --- /dev/null +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/JobStatus.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord.common; + +import io.fabric8.kubernetes.api.model.batch.v1.Job; + +public class JobStatus +{ + + public static boolean isActive(Job job) + { + if (job == null || job.getStatus() == null || job.getStatus().getActive() == null) { + return false; + } + return job.getStatus().getActive() > 0; + } + + public static boolean isSucceeded(Job job) + { + if (job == null || job.getStatus() == null || job.getStatus().getSucceeded() == null) { + return false; + } + return job.getStatus().getSucceeded() > 0; + } + + public static boolean isFailed(Job job) + { + if (job == null || job.getStatus() == null || job.getStatus().getFailed() == null) { + return false; + } + return job.getStatus().getFailed() > 0; + } +} diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java index f04e1517b059..f9e402f94e29 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java @@ -25,7 +25,6 @@ import java.io.InputStream; import java.util.List; -import java.util.Set; import java.util.concurrent.TimeUnit; /** @@ -48,10 +47,6 @@ public interface KubernetesPeonClient List listAllPeonJobs(); - List listPeonPods(Set phases); - - List listPeonPods(); - int cleanCompletedJobsOlderThan(long howFarBack, TimeUnit timeUnit); Pod getMainJobPod(K8sTaskId taskId); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClientTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClientTest.java index b864c730dff5..ebf7d8c0a447 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClientTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClientTest.java @@ -23,8 +23,6 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import io.fabric8.kubernetes.api.model.ObjectMeta; -import io.fabric8.kubernetes.api.model.Pod; -import io.fabric8.kubernetes.api.model.PodBuilder; import io.fabric8.kubernetes.api.model.PodListBuilder; import io.fabric8.kubernetes.api.model.PodTemplateSpec; import io.fabric8.kubernetes.api.model.batch.v1.Job; @@ -104,24 +102,6 @@ void testTheFlow() Assertions.assertEquals(1, currentJobs.size()); } - @Test - void testListPeonPods() - { - Pod pod = new PodBuilder() - .withNewMetadata() - .withName("foo") - .addToLabels(DruidK8sConstants.LABEL_KEY, "true") - .endMetadata() - .withSpec(K8sTestUtils.getDummyPodSpec()) - .build(); - client.pods().inNamespace("test").create(pod); - DruidKubernetesPeonClient peonClient = new DruidKubernetesPeonClient(new TestKubernetesClient(this.client), "test", - false - ); - List pods = peonClient.listPeonPods(); - Assertions.assertEquals(1, pods.size()); - } - @Test void testCleanup() throws KubernetesResourceNotFoundException { diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/JobStatusTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/JobStatusTest.java new file mode 100644 index 000000000000..e380f275956e --- /dev/null +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/JobStatusTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord.common; + +import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder; +import io.fabric8.kubernetes.api.model.batch.v1.JobStatusBuilder; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class JobStatusTest +{ + @Test + void testJobsActive() + { + Assertions.assertFalse(JobStatus.isActive(null)); + Assertions.assertFalse(JobStatus.isActive(new JobBuilder().build())); + Assertions.assertFalse(JobStatus.isActive(new JobBuilder().withStatus(new JobStatusBuilder().withActive(null).build()).build())); + Assertions.assertFalse(JobStatus.isActive(new JobBuilder().withStatus(new JobStatusBuilder().withActive(0).build()).build())); + Assertions.assertTrue(JobStatus.isActive(new JobBuilder().withStatus(new JobStatusBuilder().withActive(1).build()).build())); + } + + @Test + void testJobsSucceeded() + { + Assertions.assertFalse(JobStatus.isSucceeded(null)); + Assertions.assertFalse(JobStatus.isSucceeded(new JobBuilder().build())); + Assertions.assertFalse(JobStatus.isSucceeded(new JobBuilder().withStatus(new JobStatusBuilder().withSucceeded(null).build()).build())); + Assertions.assertFalse(JobStatus.isSucceeded(new JobBuilder().withStatus(new JobStatusBuilder().withSucceeded(0).build()).build())); + Assertions.assertTrue(JobStatus.isSucceeded(new JobBuilder().withStatus(new JobStatusBuilder().withSucceeded(1).build()).build())); + } + + @Test + void testJobsFailed() + { + Assertions.assertFalse(JobStatus.isFailed(null)); + Assertions.assertFalse(JobStatus.isFailed(new JobBuilder().build())); + Assertions.assertFalse(JobStatus.isFailed(new JobBuilder().withStatus(new JobStatusBuilder().withFailed(null).build()).build())); + Assertions.assertFalse(JobStatus.isFailed(new JobBuilder().withStatus(new JobStatusBuilder().withFailed(0).build()).build())); + Assertions.assertTrue(JobStatus.isFailed(new JobBuilder().withStatus(new JobStatusBuilder().withFailed(1).build()).build())); + } +} From 3eea4c57e7a9fba5942b18f74bafffab8cb5df0c Mon Sep 17 00:00:00 2001 From: George Wu Date: Wed, 5 Apr 2023 11:53:58 -0400 Subject: [PATCH 10/10] revert wait change --- .../druid/k8s/overlord/common/DruidKubernetesPeonClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java index cbfef8f3d611..04e3692b6720 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java @@ -104,7 +104,7 @@ public JobResponse waitForJobCompletion(K8sTaskId taskId, long howLong, TimeUnit .inNamespace(namespace) .withName(taskId.getK8sTaskId()) .waitUntilCondition( - x -> !JobStatus.isActive(x), + x -> (x == null) || (x.getStatus() != null && x.getStatus().getActive() == null), howLong, unit );