From b7f842221a65678ceeb3cf606877d6ab1dab5350 Mon Sep 17 00:00:00 2001 From: Rahul Gidwani Date: Tue, 14 Feb 2023 10:52:46 -0800 Subject: [PATCH 01/14] Upgrade the fabric client to support newer versions of k8s, add ability to override peon monitors --- .../extensions-contrib/k8s-jobs.md | 1 + .../kubernetes-overlord-extensions/pom.xml | 8 +-- .../overlord/KubernetesTaskRunnerConfig.java | 8 +++ .../overlord/KubernetesTaskRunnerFactory.java | 3 +- .../common/DruidKubernetesClient.java | 13 +++- .../common/DruidKubernetesPeonClient.java | 67 ++++++++--------- .../k8s/overlord/common/K8sTaskAdapter.java | 17 ++++- .../overlord/common/KubernetesClientApi.java | 7 ++ .../overlord/common/LogWatchInputStream.java | 52 ++++++++++++++ .../overlord/common/K8sTaskAdapterTest.java | 49 +++++++++++++ .../k8s/overlord/common/K8sTestUtils.java | 8 +++ .../common/MultiContainerTaskAdapterTest.java | 72 ++++++++++++++----- .../SingleContainerTaskAdapterTest.java | 12 +--- .../overlord/common/TestKubernetesClient.java | 6 ++ licenses.yaml | 2 +- 15 files changed, 254 insertions(+), 71 deletions(-) create mode 100644 extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/LogWatchInputStream.java diff --git a/docs/development/extensions-contrib/k8s-jobs.md b/docs/development/extensions-contrib/k8s-jobs.md index 178ec91e29bc..d4adc30fbc61 100644 --- a/docs/development/extensions-contrib/k8s-jobs.md +++ b/docs/development/extensions-contrib/k8s-jobs.md @@ -66,6 +66,7 @@ Additional Configuration |`druid.indexer.runner.javaOptsArray`|`JsonArray`|java opts for the task.|`-Xmx1g`|No| |`druid.indexer.runner.labels`|`JsonObject`|Additional labels you want to add to peon pod|`{}`|No| |`druid.indexer.runner.annotations`|`JsonObject`|Additional annotations you want to add to peon pod|`{}`|No| +|`druid.indexer.runner.peonMonitors`|`JsonArray`|An override for the `druid.monitoring.monitors`, For the situation you have monitors setup, and do not want to inherit those from the overlord.|`[]`|No| |`druid.indexer.runner.graceTerminationPeriodSeconds`|`Long`|Number of seconds you want to wait after a sigterm for container lifecycle hooks to complete. Keep at a smaller value if you want tasks to hold locks for shorter periods.|`PT30S` (K8s default)|No| ### Gotchas diff --git a/extensions-contrib/kubernetes-overlord-extensions/pom.xml b/extensions-contrib/kubernetes-overlord-extensions/pom.xml index 6083a0d53fc8..a9b7cf8c4f27 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/pom.xml +++ b/extensions-contrib/kubernetes-overlord-extensions/pom.xml @@ -104,7 +104,7 @@ io.fabric8 kubernetes-model-core - 5.12.2 + 6.4.1 javax.validation @@ -114,12 +114,12 @@ io.fabric8 kubernetes-model-batch - 5.12.2 + 6.4.1 io.fabric8 kubernetes-client - 5.12.2 + 6.4.1 @@ -136,7 +136,7 @@ io.fabric8 kubernetes-server-mock - 5.12.2 + 6.4.1 test diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java index 82d57d609874..c29293f18eef 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java @@ -24,6 +24,7 @@ import org.joda.time.Period; import javax.validation.constraints.NotNull; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -81,6 +82,13 @@ public class KubernetesTaskRunnerConfig // how long to wait for the peon k8s job to launch public Period k8sjobLaunchTimeout = new Period("PT1H"); + @JsonProperty + // ForkingTaskRunner inherits the monitors from the MM, in k8s mode + // the peon inherits the monitors from the overlord, so if someone specifies + // a TaskCountStatsMonitor in the overlord for example, the peon process + // fails because it can not inject this monitor in the peon process. + public List peonMonitors = new ArrayList<>(); + @JsonProperty @NotNull public List javaOptsArray; diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java index 14de4a092767..8b7c272e5e99 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.ConfigBuilder; import org.apache.druid.guice.annotations.Self; import org.apache.druid.guice.annotations.Smile; import org.apache.druid.indexing.common.TaskStorageDirTracker; @@ -76,7 +77,7 @@ public KubernetesTaskRunner build() { DruidKubernetesClient client; if (kubernetesTaskRunnerConfig.disableClientProxy) { - Config config = Config.autoConfigure(null); + Config config = new ConfigBuilder().build(); config.setHttpsProxy(null); config.setHttpProxy(null); client = new DruidKubernetesClient(config); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesClient.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesClient.java index 6bb297264d66..c39ce9198bb4 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesClient.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesClient.java @@ -20,8 +20,9 @@ package org.apache.druid.k8s.overlord.common; import io.fabric8.kubernetes.client.Config; -import io.fabric8.kubernetes.client.DefaultKubernetesClient; +import io.fabric8.kubernetes.client.ConfigBuilder; import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientBuilder; public class DruidKubernetesClient implements KubernetesClientApi { @@ -30,7 +31,7 @@ public class DruidKubernetesClient implements KubernetesClientApi public DruidKubernetesClient() { - this(Config.autoConfigure(null)); + this(new ConfigBuilder().build()); } public DruidKubernetesClient(Config config) @@ -41,8 +42,14 @@ public DruidKubernetesClient(Config config) @Override public T executeRequest(KubernetesExecutor executor) throws KubernetesResourceNotFoundException { - try (KubernetesClient client = new DefaultKubernetesClient(config)) { + try (KubernetesClient client = getClient()) { return executor.executeRequest(client); } } + + @Override + public KubernetesClient getClient() + { + return new KubernetesClientBuilder().withConfig(config).build(); + } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java index 2aebc37aba19..eecee1467258 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java @@ -25,13 +25,11 @@ import io.fabric8.kubernetes.api.model.PodList; import io.fabric8.kubernetes.api.model.batch.v1.Job; import io.fabric8.kubernetes.client.KubernetesClient; -import org.apache.commons.io.input.ReaderInputStream; +import io.fabric8.kubernetes.client.dsl.LogWatch; import org.apache.druid.java.util.common.RetryUtils; import org.apache.druid.java.util.emitter.EmittingLogger; import java.io.InputStream; -import java.io.Reader; -import java.nio.charset.StandardCharsets; import java.sql.Timestamp; import java.util.ArrayList; import java.util.List; @@ -78,7 +76,7 @@ public Pod launchJobAndWaitForStart(Job job, long howLong, TimeUnit timeUnit) long start = System.currentTimeMillis(); // launch job return clientApi.executeRequest(client -> { - client.batch().v1().jobs().inNamespace(namespace).create(job); + client.batch().v1().jobs().inNamespace(namespace).resource(job).create(); K8sTaskId taskId = new K8sTaskId(job.getMetadata().getName()); log.info("Successfully submitted job: %s ... waiting for job to launch", taskId); // wait until the pod is running or complete or failed, any of those is fine @@ -121,12 +119,12 @@ public JobResponse waitForJobCompletion(K8sTaskId taskId, long howLong, TimeUnit public boolean cleanUpJob(K8sTaskId taskId) { if (!debugJobs) { - Boolean result = clientApi.executeRequest(client -> client.batch() - .v1() - .jobs() - .inNamespace(namespace) - .withName(taskId.getK8sTaskId()) - .delete()); + Boolean result = clientApi.executeRequest(client -> !client.batch() + .v1() + .jobs() + .inNamespace(namespace) + .withName(taskId.getK8sTaskId()) + .delete().isEmpty()); if (result) { log.info("Cleaned up k8s task: %s", taskId); } else { @@ -160,23 +158,24 @@ public String getJobLogs(K8sTaskId taskId) @Override public Optional getPeonLogs(K8sTaskId taskId) { + KubernetesClient k8sClient = clientApi.getClient(); try { - return clientApi.executeRequest(client -> { - Reader reader = client.batch() - .v1() - .jobs() - .inNamespace(namespace) - .withName(taskId.getK8sTaskId()) - .inContainer("main") - .getLogReader(); - if (reader == null) { - return Optional.absent(); - } - return Optional.of(new ReaderInputStream(reader, StandardCharsets.UTF_8)); - }); + LogWatch logWatch = k8sClient.batch() + .v1() + .jobs() + .inNamespace(namespace) + .withName(taskId.getK8sTaskId()) + .inContainer("main") + .watchLog(); + if (logWatch == null) { + k8sClient.close(); + return Optional.absent(); + } + return Optional.of(new LogWatchInputStream(k8sClient, logWatch)); } catch (Exception e) { - log.error("Error streaming logs from task: %s", taskId); + log.error(e, "Error streaming logs from task: %s", taskId); + k8sClient.close(); return Optional.absent(); } } @@ -197,17 +196,17 @@ public List listAllPeonJobs() public List listPeonPods(Set phases) { return listPeonPods().stream() - .filter(x -> phases.contains(PeonPhase.getPhaseFor(x))) - .collect(Collectors.toList()); + .filter(x -> phases.contains(PeonPhase.getPhaseFor(x))) + .collect(Collectors.toList()); } @Override public List listPeonPods() { - PodList podList = clientApi.executeRequest(client -> client.pods().inNamespace(namespace)) - .withLabel(DruidK8sConstants.LABEL_KEY) - .list(); - return podList.getItems(); + return clientApi.executeRequest(client -> client.pods().inNamespace(namespace) + .withLabel(DruidK8sConstants.LABEL_KEY) + .list().getItems()); + } @Override @@ -217,7 +216,12 @@ public int cleanCompletedJobsOlderThan(long howFarBack, TimeUnit timeUnit) return clientApi.executeRequest(client -> { List jobs = getJobsToCleanup(listAllPeonJobs(), howFarBack, timeUnit); jobs.forEach(x -> { - if (client.batch().v1().jobs().inNamespace(namespace).withName(x.getMetadata().getName()).delete()) { + if (!client.batch() + .v1() + .jobs() + .inNamespace(namespace) + .withName(x.getMetadata().getName()) + .delete().isEmpty()) { numDeleted.incrementAndGet(); } }); @@ -271,5 +275,4 @@ Pod getMainJobPod(KubernetesClient client, K8sTaskId taskId) throw new KubernetesResourceNotFoundException("K8s pod with label: job-name=" + k8sTaskId + " not found"); } } - } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapter.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapter.java index b0672113996f..229809ce812f 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapter.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapter.java @@ -19,6 +19,7 @@ package org.apache.druid.k8s.overlord.common; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; @@ -96,6 +97,7 @@ public Job fromTask(Task task, PeonCommandContext context) throws IOException @Override public Task toTask(Pod from) throws IOException { + // all i have to do here is grab the main container...done PodSpec podSpec = from.getSpec(); massageSpec(podSpec, "main"); List envVars = podSpec.getContainers().get(0).getEnv(); @@ -176,8 +178,19 @@ protected void setupPorts(Container mainContainer) mainContainer.setPorts(Lists.newArrayList(httpsPort, tcpPort)); } - protected void addEnvironmentVariables(Container mainContainer, PeonCommandContext context, String taskContents) + @VisibleForTesting + void addEnvironmentVariables(Container mainContainer, PeonCommandContext context, String taskContents) + throws JsonProcessingException { + // if the peon monitors are set, override the overlord's monitors (if set) with the peon monitors + if (!config.peonMonitors.isEmpty()) { + mainContainer.getEnv().removeIf(x -> "druid_monitoring_monitors".equals(x.getName())); + mainContainer.getEnv().add(new EnvVarBuilder() + .withName("druid_monitoring_monitors") + .withValue(mapper.writeValueAsString(config.peonMonitors)) + .build()); + } + mainContainer.getEnv().addAll(Lists.newArrayList( new EnvVarBuilder() .withName(DruidK8sConstants.TASK_DIR_ENV) @@ -211,7 +224,7 @@ protected Container setupMainContainer( PeonCommandContext context, long containerSize, String taskContents - ) + ) throws JsonProcessingException { // prepend the startup task.json extraction command List mainCommand = Lists.newArrayList("sh", "-c"); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesClientApi.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesClientApi.java index 44b0464fcfa8..655e4435f205 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesClientApi.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesClientApi.java @@ -19,8 +19,15 @@ package org.apache.druid.k8s.overlord.common; +import io.fabric8.kubernetes.client.KubernetesClient; + // Wraps all kubernetes api calls, to ensure you open and close connections properly public interface KubernetesClientApi { T executeRequest(KubernetesExecutor executor) throws KubernetesResourceNotFoundException; + + // use only when handling streams of data, example if you want to pass around an input stream from a pod, + // then you would call this instead of executeRequest as you would want to keep the connection open until you + // are done with the stream. Callers responsibility to clean up when using this method + KubernetesClient getClient(); } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/LogWatchInputStream.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/LogWatchInputStream.java new file mode 100644 index 000000000000..0c3f87d3a95d --- /dev/null +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/LogWatchInputStream.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord.common; + +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.dsl.LogWatch; + +import java.io.IOException; +import java.io.InputStream; + +public class LogWatchInputStream extends InputStream +{ + + private final KubernetesClient client; + private final LogWatch logWatch; + + public LogWatchInputStream(KubernetesClient client, LogWatch logWatch) + { + this.client = client; + this.logWatch = logWatch; + } + + @Override + public int read() throws IOException + { + return logWatch.getOutput().read(); + } + + @Override + public void close() throws IOException + { + logWatch.close(); + client.close(); + } +} diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapterTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapterTest.java index 59ef5b430c1d..ef7f0189a599 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapterTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapterTest.java @@ -26,6 +26,8 @@ import com.google.common.collect.Lists; import io.fabric8.kubernetes.api.model.Container; import io.fabric8.kubernetes.api.model.ContainerBuilder; +import io.fabric8.kubernetes.api.model.EnvVar; +import io.fabric8.kubernetes.api.model.EnvVarBuilder; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodList; import io.fabric8.kubernetes.api.model.PodSpec; @@ -205,4 +207,51 @@ void testNoPrimaryFound() }); } + @Test + void testAddingMonitors() throws IOException + { + TestKubernetesClient testClient = new TestKubernetesClient(client); + PeonCommandContext context = new PeonCommandContext( + new ArrayList<>(), + new ArrayList<>(), + new File("/tmp/") + ); + KubernetesTaskRunnerConfig config = new KubernetesTaskRunnerConfig(); + config.namespace = "test"; + K8sTaskAdapter adapter = new SingleContainerTaskAdapter(testClient, config, jsonMapper); + Task task = K8sTestUtils.getTask(); + // no monitor in overlord, no monitor override + Container container = new ContainerBuilder() + .withName("container").build(); + adapter.addEnvironmentVariables(container, context, task.toString()); + assertFalse(container.getEnv().stream().anyMatch(x -> x.getName().equals("druid_monitoring_monitors"))); + + // we have an override, but nothing in the overlord + config.peonMonitors = jsonMapper.readValue("[\"org.apache.druid.java.util.metrics.JvmMonitor\"]", List.class); + adapter = new SingleContainerTaskAdapter(testClient, config, jsonMapper); + adapter.addEnvironmentVariables(container, context, task.toString()); + EnvVar env = container.getEnv() + .stream() + .filter(x -> x.getName().equals("druid_monitoring_monitors")) + .findFirst() + .get(); + assertEquals(jsonMapper.writeValueAsString(config.peonMonitors), env.getValue()); + + // we override what is in the overlord + config.peonMonitors = jsonMapper.readValue("[\"org.apache.druid.java.util.metrics.JvmMonitor\"]", List.class); + adapter = new SingleContainerTaskAdapter(testClient, config, jsonMapper); + container.getEnv().add(new EnvVarBuilder() + .withName("druid_monitoring_monitors") + .withValue( + "'[\"org.apache.druid.java.util.metrics.JvmMonitor\", " + + "\"org.apache.druid.server.metrics.TaskCountStatsMonitor\"]'") + .build()); + adapter.addEnvironmentVariables(container, context, task.toString()); + env = container.getEnv() + .stream() + .filter(x -> x.getName().equals("druid_monitoring_monitors")) + .findFirst() + .get(); + assertEquals(jsonMapper.writeValueAsString(config.peonMonitors), env.getValue()); + } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTestUtils.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTestUtils.java index 0f454102986d..503708f77430 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTestUtils.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTestUtils.java @@ -27,6 +27,7 @@ import io.fabric8.kubernetes.api.model.PodSpecBuilder; import io.fabric8.kubernetes.api.model.PodTemplateSpec; import io.fabric8.kubernetes.api.model.batch.v1.Job; +import io.fabric8.kubernetes.client.utils.Serialization; import org.apache.commons.text.CharacterPredicates; import org.apache.commons.text.RandomStringGenerator; import org.apache.druid.data.input.impl.DimensionsSpec; @@ -142,4 +143,11 @@ public static Task getTask() ); } + public static T fileToResource(String contents, Class type) + { + return Serialization.unmarshal( + MultiContainerTaskAdapter.class.getClassLoader().getResourceAsStream(contents), + type + ); + } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/MultiContainerTaskAdapterTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/MultiContainerTaskAdapterTest.java index f1c7b390de52..2132f7985635 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/MultiContainerTaskAdapterTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/MultiContainerTaskAdapterTest.java @@ -40,6 +40,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.List; @EnableKubernetesMockClient(crud = true) class MultiContainerTaskAdapterTest @@ -66,7 +67,7 @@ public MultiContainerTaskAdapterTest() public void testMultiContainerSupport() throws IOException { TestKubernetesClient testClient = new TestKubernetesClient(client); - Pod pod = client.pods().load(this.getClass().getClassLoader().getResourceAsStream("multiContainerPodSpec.yaml")).get(); + Pod pod = K8sTestUtils.fileToResource("multiContainerPodSpec.yaml", Pod.class); KubernetesTaskRunnerConfig config = new KubernetesTaskRunnerConfig(); config.namespace = "test"; MultiContainerTaskAdapter adapter = new MultiContainerTaskAdapter(testClient, config, jsonMapper); @@ -79,11 +80,7 @@ public void testMultiContainerSupport() throws IOException new File("/tmp") ) ); - Job expected = client.batch() - .v1() - .jobs() - .load(this.getClass().getClassLoader().getResourceAsStream("expectedMultiContainerOutput.yaml")) - .get(); + Job expected = K8sTestUtils.fileToResource("expectedMultiContainerOutput.yaml", Job.class); // something is up with jdk 17, where if you compress with jdk < 17 and try and decompress you get different results, // this would never happen in real life, but for the jdk 17 tests this is a problem @@ -109,7 +106,7 @@ public void testMultiContainerSupport() throws IOException public void testMultiContainerSupportWithNamedContainer() throws IOException { TestKubernetesClient testClient = new TestKubernetesClient(client); - Pod pod = client.pods().load(this.getClass().getClassLoader().getResourceAsStream("multiContainerPodSpecOrder.yaml")).get(); + Pod pod = K8sTestUtils.fileToResource("multiContainerPodSpecOrder.yaml", Pod.class); KubernetesTaskRunnerConfig config = new KubernetesTaskRunnerConfig(); config.namespace = "test"; config.primaryContainerName = "primary"; @@ -118,29 +115,68 @@ public void testMultiContainerSupportWithNamedContainer() throws IOException PodSpec spec = pod.getSpec(); K8sTaskAdapter.massageSpec(spec, "primary"); Job actual = adapter.createJobFromPodSpec( - spec, - task, - new PeonCommandContext(Collections.singletonList("/peon.sh /druid/data/baseTaskDir/noop_2022-09-26T22:08:00.582Z_352988d2-5ff7-4b70-977c-3de96f9bfca6 1"), - new ArrayList<>(), - new File("/tmp") - ) + spec, + task, + new PeonCommandContext(Collections.singletonList("/peon.sh /druid/data/baseTaskDir/noop_2022-09-26T22:08:00.582Z_352988d2-5ff7-4b70-977c-3de96f9bfca6 1"), + new ArrayList<>(), + new File("/tmp") + ) ); - Job expected = client.batch() - .v1() - .jobs() - .load(this.getClass().getClassLoader().getResourceAsStream("expectedMultiContainerOutputOrder.yaml")) - .get(); + Job expected = K8sTestUtils.fileToResource("expectedMultiContainerOutputOrder.yaml", Job.class); // something is up with jdk 17, where if you compress with jdk < 17 and try and decompress you get different results, // this would never happen in real life, but for the jdk 17 tests this is a problem // could be related to: https://bugs.openjdk.org/browse/JDK-8081450 actual.getSpec() + .getTemplate() + .getSpec() + .getContainers() + .get(0) + .getEnv() + .removeIf(x -> x.getName().equals("TASK_JSON")); + expected.getSpec() .getTemplate() .getSpec() .getContainers() .get(0) .getEnv() .removeIf(x -> x.getName().equals("TASK_JSON")); + Assertions.assertEquals(expected, actual); + } + + @Test + public void testOverridingPeonMonitors() throws IOException + { + TestKubernetesClient testClient = new TestKubernetesClient(client); + Pod pod = K8sTestUtils.fileToResource("podSpec.yaml", Pod.class); + KubernetesTaskRunnerConfig config = new KubernetesTaskRunnerConfig(); + config.namespace = "test"; + config.primaryContainerName = "primary"; + config.peonMonitors = jsonMapper.readValue("[\"org.apache.druid.java.util.metrics.JvmMonitor\"]", List.class); + MultiContainerTaskAdapter adapter = new MultiContainerTaskAdapter(testClient, config, jsonMapper); + NoopTask task = NoopTask.create("id", 1); + PodSpec spec = pod.getSpec(); + K8sTaskAdapter.massageSpec(spec, config.primaryContainerName); + Job actual = adapter.createJobFromPodSpec( + spec, + task, + new PeonCommandContext(Collections.singletonList("/peon.sh /druid/data/baseTaskDir/noop_2022-09-26T22:08:00.582Z_352988d2-5ff7-4b70-977c-3de96f9bfca6 1"), + new ArrayList<>(), + new File("/tmp") + ) + ); + Job expected = K8sTestUtils.fileToResource("expectedPodSpec.yaml", Job.class); + + // something is up with jdk 17, where if you compress with jdk < 17 and try and decompress you get different results, + // this would never happen in real life, but for the jdk 17 tests this is a problem + // could be related to: https://bugs.openjdk.org/browse/JDK-8081450 + actual.getSpec() + .getTemplate() + .getSpec() + .getContainers() + .get(0) + .getEnv() + .removeIf(x -> x.getName().equals("TASK_JSON")); expected.getSpec() .getTemplate() .getSpec() diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/SingleContainerTaskAdapterTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/SingleContainerTaskAdapterTest.java index d19ac42f5e92..3ae20f35cb69 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/SingleContainerTaskAdapterTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/SingleContainerTaskAdapterTest.java @@ -65,9 +65,7 @@ public SingleContainerTaskAdapterTest() public void testSingleContainerSupport() throws IOException { TestKubernetesClient testClient = new TestKubernetesClient(client); - Pod pod = client.pods() - .load(this.getClass().getClassLoader().getResourceAsStream("multiContainerPodSpec.yaml")) - .get(); + Pod pod = K8sTestUtils.fileToResource("multiContainerPodSpec.yaml", Pod.class); KubernetesTaskRunnerConfig config = new KubernetesTaskRunnerConfig(); config.namespace = "test"; SingleContainerTaskAdapter adapter = new SingleContainerTaskAdapter(testClient, config, jsonMapper); @@ -81,13 +79,7 @@ public void testSingleContainerSupport() throws IOException new File("/tmp") ) ); - Job expected = client.batch() - .v1() - .jobs() - .load(this.getClass() - .getClassLoader() - .getResourceAsStream("expectedSingleiContainerOutput.yaml")) - .get(); + Job expected = K8sTestUtils.fileToResource("expectedSingleiContainerOutput.yaml", Job.class); // something is up with jdk 17, where if you compress with jdk < 17 and try and decompress you get different results, // this would never happen in real life, but for the jdk 17 tests this is a problem // could be related to: https://bugs.openjdk.org/browse/JDK-8081450 diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/TestKubernetesClient.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/TestKubernetesClient.java index 2cd1538cd727..57be98251a9a 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/TestKubernetesClient.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/TestKubernetesClient.java @@ -36,4 +36,10 @@ public T executeRequest(KubernetesExecutor executor) throws KubernetesRes { return executor.executeRequest(client); } + + @Override + public KubernetesClient getClient() + { + return client; + } } diff --git a/licenses.yaml b/licenses.yaml index 696478d8516d..6f249eb626c3 100644 --- a/licenses.yaml +++ b/licenses.yaml @@ -901,7 +901,7 @@ name: kubernetes fabric java client license_category: binary module: extensions-contrib/kubernetes-overlord-extensions license_name: Apache License version 2.0 -version: 5.12.2 +version: 6.4.1 libraries: - io.fabric8: kubernetes-client From ae4248f8e1ef9284053ff138ff92ffd944da8a8e Mon Sep 17 00:00:00 2001 From: Rahul Gidwani Date: Tue, 14 Feb 2023 11:46:14 -0800 Subject: [PATCH 02/14] Fix dependencies check in build --- extensions-contrib/kubernetes-overlord-extensions/pom.xml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/extensions-contrib/kubernetes-overlord-extensions/pom.xml b/extensions-contrib/kubernetes-overlord-extensions/pom.xml index a9b7cf8c4f27..86ed3bae78c4 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/pom.xml +++ b/extensions-contrib/kubernetes-overlord-extensions/pom.xml @@ -116,6 +116,11 @@ kubernetes-model-batch 6.4.1 + + io.fabric8 + kubernetes-client-api + 6.4.1 + io.fabric8 kubernetes-client From c77cb94ef812f259b5a31e962a36af84d5799acb Mon Sep 17 00:00:00 2001 From: Rahul Gidwani Date: Tue, 14 Feb 2023 12:46:56 -0800 Subject: [PATCH 03/14] Have to remove the concrete dependency and keep the api --- extensions-contrib/kubernetes-overlord-extensions/pom.xml | 5 ----- 1 file changed, 5 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/pom.xml b/extensions-contrib/kubernetes-overlord-extensions/pom.xml index 86ed3bae78c4..4ab24ffe41c1 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/pom.xml +++ b/extensions-contrib/kubernetes-overlord-extensions/pom.xml @@ -121,11 +121,6 @@ kubernetes-client-api 6.4.1 - - io.fabric8 - kubernetes-client - 6.4.1 - From 922afaa3505d5f902b66875929d0de1de3879fb1 Mon Sep 17 00:00:00 2001 From: Rahul Gidwani Date: Tue, 14 Feb 2023 14:50:41 -0800 Subject: [PATCH 04/14] Forgot to update the expected file in this commit --- .../DruidPeonClientIntegrationTest.java | 1 + .../src/test/resources/expectedPodSpec.yaml | 91 ++++++++++++++----- 2 files changed, 67 insertions(+), 25 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidPeonClientIntegrationTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidPeonClientIntegrationTest.java index 72acaf42b1b9..29c0f5c5cc37 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidPeonClientIntegrationTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidPeonClientIntegrationTest.java @@ -54,6 +54,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; // must have a kind / minikube cluster installed and the image pushed to your repository +@Disabled public class DruidPeonClientIntegrationTest { private final KubernetesClientApi k8sClient; diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedPodSpec.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedPodSpec.yaml index 9b2b6a8e1f17..939879a4af2c 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedPodSpec.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedPodSpec.yaml @@ -2,29 +2,50 @@ apiVersion: "batch/v1" kind: "Job" metadata: annotations: - task.id: "task_id" + task.id: "id" tls.enabled: "false" labels: druid.k8s.peons: "true" - name: "taskid" + name: "id" spec: - activeDeadlineSeconds: 3600 + activeDeadlineSeconds: 14400 backoffLimit: 0 template: metadata: annotations: - task.id: "task_id" + task.id: "id" tls.enabled: "false" labels: druid.k8s.peons: "true" spec: containers: - args: - - "trap 'touch /usr/share/pod/done' EXIT; mkdir -p ${TASK_DIR}; echo ${TASK_JSON}\ - \ | base64 -d | gzip -d > ${TASK_DIR}/task.json; " + - "/kubexit/kubexit /bin/sh -c \"/peon.sh /druid/data/baseTaskDir/noop_2022-09-26T22:08:00.582Z_352988d2-5ff7-4b70-977c-3de96f9bfca6\ + \ 1\"" command: - - "sh" + - "/bin/sh" - "-c" + env: + - name: "druid_monitoring_monitors" + value: '["org.apache.druid.java.util.metrics.JvmMonitor"]' + - name: "TASK_DIR" + value: "/tmp" + - name: "TASK_JSON" + value: "H4sIAAAAAAAAAEVOOw7CMAy9i+cOBYmlK0KItWVhNI0BSyEOToKoqt4doxZYLPv9/EbIQyRoIIhEqICd7TYquKqUePidDjN2UrSfxYEM0xKOfDdgvalr86aW0A0z9L9bSsVnc512nZkurHSTZJJQvK+gl5DpZfwIUVmU8wDNarJ0Ssu/EfCJ7PHM3tj9p9i3ltKjWKDbYsR+sU5vP86oMNUAAAA=" + - name: "JAVA_OPTS" + value: "" + - name: "druid_host" + valueFrom: + fieldRef: + fieldPath: "status.podIP" + - name: "HOSTNAME" + valueFrom: + fieldRef: + fieldPath: "metadata.name" + - name: "KUBEXIT_NAME" + value: "main" + - name: "KUBEXIT_GRAVEYARD" + value: "/graveyard" image: "one" name: "main" ports: @@ -37,30 +58,50 @@ spec: resources: limits: cpu: "1000m" - memory: "1000000000" + memory: "2400000000" requests: cpu: "1000m" - memory: "1000000000" + memory: "2400000000" volumeMounts: - - mountPath: "/usr/share/pod" - name: "peon-share" - - command: - - "tail -f /dev/null" + - mountPath: "/graveyard" + name: "graveyard" + - mountPath: "/kubexit" + name: "kubexit" + - args: + - "/kubexit/kubexit /bin/sh -c \"tail -f /dev/null\" || true" + command: + - "/bin/sh" + - "-c" + env: + - name: "KUBEXIT_NAME" + value: "sidecar" + - name: "KUBEXIT_GRAVEYARD" + value: "/graveyard" + - name: "KUBEXIT_DEATH_DEPS" + value: "main" image: "two" - lifecycle: - postStart: - exec: - command: - - "while ! test -f /usr/share/pod/done; do echo 'Waiting for the main\ - \ pod to finish...'; sleep 5; done; echo 'Agent pod finished, exiting';\ - \ exit 0" name: "sidecar" volumeMounts: - - mountPath: "/usr/share/pod" - name: "peon-share" - readOnly: true + - mountPath: "/graveyard" + name: "graveyard" + - mountPath: "/kubexit" + name: "kubexit" + hostname: "id" + initContainers: + - command: + - "cp" + - "/bin/kubexit" + - "/kubexit/kubexit" + image: "karlkfi/kubexit:v0.3.2" + name: "kubexit" + volumeMounts: + - mountPath: "/kubexit" + name: "kubexit" restartPolicy: "Never" volumes: + - emptyDir: + medium: "Memory" + name: "graveyard" - emptyDir: {} - name: "peon-share" - ttlSecondsAfterFinished: 7200 \ No newline at end of file + name: "kubexit" + ttlSecondsAfterFinished: 172800 From e65577725ca2da3b47c777b0086eaa53044d7f91 Mon Sep 17 00:00:00 2001 From: Rahul Gidwani Date: Tue, 14 Feb 2023 17:31:19 -0800 Subject: [PATCH 05/14] Adding code coverage, fixing some inspections --- .../overlord/common/LogWatchInputStream.java | 7 ++- .../common/DruidKubernetesClientTest.java | 54 +++++++++++++++++++ .../common/LogWatchInputStreamTest.java | 54 +++++++++++++++++++ 3 files changed, 114 insertions(+), 1 deletion(-) create mode 100644 extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidKubernetesClientTest.java create mode 100644 extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/LogWatchInputStreamTest.java diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/LogWatchInputStream.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/LogWatchInputStream.java index 0c3f87d3a95d..3665e940f349 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/LogWatchInputStream.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/LogWatchInputStream.java @@ -25,6 +25,11 @@ import java.io.IOException; import java.io.InputStream; +/** + * This wraps the InputStream for k8s client + * When you call close on the stream, it will also close the open + * http connections and the client + */ public class LogWatchInputStream extends InputStream { @@ -44,7 +49,7 @@ public int read() throws IOException } @Override - public void close() throws IOException + public void close() { logWatch.close(); client.close(); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidKubernetesClientTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidKubernetesClientTest.java new file mode 100644 index 000000000000..c8a157c1b51f --- /dev/null +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidKubernetesClientTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord.common; + +import io.fabric8.kubernetes.client.Client; +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.KubernetesClient; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; + +class DruidKubernetesClientTest +{ + + @Test + void makingCodeCoverageHappy() + { + KubernetesClient client = mock(KubernetesClient.class); + when(client.getApiVersion()).thenReturn("foo"); + DruidKubernetesClient k8sClient = new DruidKubernetesClient(mock(Config.class)) + { + @Override + public KubernetesClient getClient() + { + return client; + } + }; + String version = k8sClient.executeRequest(Client::getApiVersion); + assertEquals("foo", version); + Mockito.verify(client, times(1)).close(); + + } +} diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/LogWatchInputStreamTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/LogWatchInputStreamTest.java new file mode 100644 index 000000000000..c3245126aa3b --- /dev/null +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/LogWatchInputStreamTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord.common; + +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.dsl.LogWatch; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.io.InputStream; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +class LogWatchInputStreamTest +{ + + @Test + void testFlow() throws IOException + { + LogWatch logWatch = mock(LogWatch.class); + InputStream inputStream = mock(InputStream.class); + when(inputStream.read()).thenReturn(1); + when(logWatch.getOutput()).thenReturn(inputStream); + KubernetesClient client = mock(KubernetesClient.class); + LogWatchInputStream stream = new LogWatchInputStream(client, logWatch); + int result = stream.read(); + Assertions.assertEquals(1, result); + verify(stream, times(1)).read(); + stream.close(); + verify(stream, times(1)).close(); + verify(client, times(1)).close(); + } +} From 9d14e46c2229bf9368d7c2f780146c244b3e1ae6 Mon Sep 17 00:00:00 2001 From: Rahul Gidwani Date: Tue, 14 Feb 2023 18:38:56 -0800 Subject: [PATCH 06/14] Made a mistake on which mocks to call verify on --- .../druid/k8s/overlord/common/LogWatchInputStreamTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/LogWatchInputStreamTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/LogWatchInputStreamTest.java index c3245126aa3b..ffb60fda99f2 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/LogWatchInputStreamTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/LogWatchInputStreamTest.java @@ -46,9 +46,9 @@ void testFlow() throws IOException LogWatchInputStream stream = new LogWatchInputStream(client, logWatch); int result = stream.read(); Assertions.assertEquals(1, result); - verify(stream, times(1)).read(); + verify(inputStream, times(1)).read(); stream.close(); - verify(stream, times(1)).close(); + verify(logWatch, times(1)).close(); verify(client, times(1)).close(); } } From 9be2a2360563cbcea634c310ec89dd106674de89 Mon Sep 17 00:00:00 2001 From: Rahul Gidwani Date: Tue, 14 Feb 2023 18:56:39 -0800 Subject: [PATCH 07/14] Have to add the client back as a runtime dependency, otherwise we get class not founds at runtime --- extensions-contrib/kubernetes-overlord-extensions/pom.xml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/extensions-contrib/kubernetes-overlord-extensions/pom.xml b/extensions-contrib/kubernetes-overlord-extensions/pom.xml index 4ab24ffe41c1..0a899094ce12 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/pom.xml +++ b/extensions-contrib/kubernetes-overlord-extensions/pom.xml @@ -121,6 +121,12 @@ kubernetes-client-api 6.4.1 + + io.fabric8 + kubernetes-client + 6.4.1 + runtime + From 4d76e5fa94bc4ca779e0b59480a9cce2d0c89beb Mon Sep 17 00:00:00 2001 From: Rahul Gidwani Date: Fri, 17 Feb 2023 09:52:48 -0800 Subject: [PATCH 08/14] Last change and some log fixes to work with k8s 1.25 --- .../org/apache/druid/k8s/overlord/KubernetesTaskRunner.java | 3 ++- .../druid/k8s/overlord/common/DruidKubernetesPeonClient.java | 4 +++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index 23fb915f3a89..ba5cdd5481df 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -197,7 +197,7 @@ public ListenableFuture run(Task task) } else { status = TaskStatus.failure( task.getId(), - "Task failed %s: " + k8sTaskId + "Task failed: " + k8sTaskId ); } if (completedPhase.getJobDuration().isPresent()) { @@ -256,6 +256,7 @@ JobResponse monitorJob(Pod peonPod, K8sTaskId k8sTaskId) @Override public void updateStatus(Task task, TaskStatus status) { + log.info("Updating status %s", status); TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status); } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java index eecee1467258..fc0c8e88a39e 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java @@ -104,13 +104,15 @@ public JobResponse waitForJobCompletion(K8sTaskId taskId, long howLong, TimeUnit .inNamespace(namespace) .withName(taskId.getK8sTaskId()) .waitUntilCondition( - x -> x != null && x.getStatus() != null && x.getStatus().getActive() == null, + x -> x != null && x.getStatus() != null && x.getStatus().getActive() == null + && (x.getStatus().getFailed() != null || x.getStatus().getSucceeded() !=null), howLong, unit ); if (job.getStatus().getSucceeded() != null) { return new JobResponse(job, PeonPhase.SUCCEEDED); } + log.warn("Task %s failed with status %s", taskId, job.getStatus()); return new JobResponse(job, PeonPhase.FAILED); }); } From 1e977682fcd86266817f11ba22ad026bf8348a19 Mon Sep 17 00:00:00 2001 From: Churro <88559693+churromorales@users.noreply.github.com> Date: Tue, 28 Feb 2023 12:45:40 -0800 Subject: [PATCH 09/14] Update docs/development/extensions-contrib/k8s-jobs.md Co-authored-by: Katya Macedo <38017980+ektravel@users.noreply.github.com> --- docs/development/extensions-contrib/k8s-jobs.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/development/extensions-contrib/k8s-jobs.md b/docs/development/extensions-contrib/k8s-jobs.md index d4adc30fbc61..248791c43fac 100644 --- a/docs/development/extensions-contrib/k8s-jobs.md +++ b/docs/development/extensions-contrib/k8s-jobs.md @@ -66,7 +66,7 @@ Additional Configuration |`druid.indexer.runner.javaOptsArray`|`JsonArray`|java opts for the task.|`-Xmx1g`|No| |`druid.indexer.runner.labels`|`JsonObject`|Additional labels you want to add to peon pod|`{}`|No| |`druid.indexer.runner.annotations`|`JsonObject`|Additional annotations you want to add to peon pod|`{}`|No| -|`druid.indexer.runner.peonMonitors`|`JsonArray`|An override for the `druid.monitoring.monitors`, For the situation you have monitors setup, and do not want to inherit those from the overlord.|`[]`|No| +|`druid.indexer.runner.peonMonitors`|`JsonArray`|Overrides `druid.monitoring.monitors`. Use this property if you don't want to inherit monitors from the overlord.|`[]`|No| |`druid.indexer.runner.graceTerminationPeriodSeconds`|`Long`|Number of seconds you want to wait after a sigterm for container lifecycle hooks to complete. Keep at a smaller value if you want tasks to hold locks for shorter periods.|`PT30S` (K8s default)|No| ### Gotchas From 55875f299aa53d5022bbe89540e66b0884499d58 Mon Sep 17 00:00:00 2001 From: Rahul Gidwani Date: Tue, 28 Feb 2023 12:46:19 -0800 Subject: [PATCH 10/14] Changes per PR review --- .../org/apache/druid/k8s/overlord/KubernetesTaskRunner.java | 2 +- .../apache/druid/k8s/overlord/common/K8sTaskAdapterTest.java | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index ba5cdd5481df..674c07f3ed6f 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -256,7 +256,7 @@ JobResponse monitorJob(Pod peonPod, K8sTaskId k8sTaskId) @Override public void updateStatus(Task task, TaskStatus status) { - log.info("Updating status %s", status); + log.info("Updating task: %s with status %s", task.getId(), status); TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status); } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapterTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapterTest.java index ef7f0189a599..2d5fc16137a9 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapterTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapterTest.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.google.api.client.util.Joiner; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import io.fabric8.kubernetes.api.model.Container; @@ -224,7 +225,8 @@ void testAddingMonitors() throws IOException Container container = new ContainerBuilder() .withName("container").build(); adapter.addEnvironmentVariables(container, context, task.toString()); - assertFalse(container.getEnv().stream().anyMatch(x -> x.getName().equals("druid_monitoring_monitors"))); + assertFalse(container.getEnv().stream().anyMatch(x -> x.getName().equals("druid_monitoring_monitors")), + "Didn't match, envs: " + Joiner.on(',').join(container.getEnv())); // we have an override, but nothing in the overlord config.peonMonitors = jsonMapper.readValue("[\"org.apache.druid.java.util.metrics.JvmMonitor\"]", List.class); From 4e8bb9da53df54b68c2a4f67b753cf264e5eed5d Mon Sep 17 00:00:00 2001 From: Churro <88559693+churromorales@users.noreply.github.com> Date: Fri, 10 Mar 2023 11:52:23 -0800 Subject: [PATCH 11/14] Update docs/development/extensions-contrib/k8s-jobs.md Co-authored-by: Charles Smith --- docs/development/extensions-contrib/k8s-jobs.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/development/extensions-contrib/k8s-jobs.md b/docs/development/extensions-contrib/k8s-jobs.md index 248791c43fac..c9aa187852d4 100644 --- a/docs/development/extensions-contrib/k8s-jobs.md +++ b/docs/development/extensions-contrib/k8s-jobs.md @@ -66,7 +66,7 @@ Additional Configuration |`druid.indexer.runner.javaOptsArray`|`JsonArray`|java opts for the task.|`-Xmx1g`|No| |`druid.indexer.runner.labels`|`JsonObject`|Additional labels you want to add to peon pod|`{}`|No| |`druid.indexer.runner.annotations`|`JsonObject`|Additional annotations you want to add to peon pod|`{}`|No| -|`druid.indexer.runner.peonMonitors`|`JsonArray`|Overrides `druid.monitoring.monitors`. Use this property if you don't want to inherit monitors from the overlord.|`[]`|No| +|`druid.indexer.runner.peonMonitors`|`JsonArray`|Overrides `druid.monitoring.monitors`. Use this property if you don't want to inherit monitors from the Overlord.|`[]`|No| |`druid.indexer.runner.graceTerminationPeriodSeconds`|`Long`|Number of seconds you want to wait after a sigterm for container lifecycle hooks to complete. Keep at a smaller value if you want tasks to hold locks for shorter periods.|`PT30S` (K8s default)|No| ### Gotchas From c28d26052ddcd8792f9d58b2cc74cc7119301f85 Mon Sep 17 00:00:00 2001 From: Rahul Gidwani Date: Mon, 13 Mar 2023 15:44:05 -0700 Subject: [PATCH 12/14] Small fix to make the config match the docs and fixed a typo in the tests caused by a merge conflict --- .../apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java | 2 +- .../k8s/overlord/common/SingleContainerTaskAdapterTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java index c29293f18eef..b12e8342e3e9 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java @@ -91,7 +91,7 @@ public class KubernetesTaskRunnerConfig @JsonProperty @NotNull - public List javaOptsArray; + public List javaOptsArray = new ArrayList<>(); @JsonProperty @NotNull diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/SingleContainerTaskAdapterTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/SingleContainerTaskAdapterTest.java index 3b5b9e21af25..6fd04a5f3057 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/SingleContainerTaskAdapterTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/SingleContainerTaskAdapterTest.java @@ -80,7 +80,7 @@ public void testSingleContainerSupport() throws IOException ) ); - Job expected = K8sTestUtils.fileToResource("expectedSingleiContainerOutput.yaml", Job.class); + Job expected = K8sTestUtils.fileToResource("expectedSingleContainerOutput.yaml", Job.class); // something is up with jdk 17, where if you compress with jdk < 17 and try and decompress you get different results, // this would never happen in real life, but for the jdk 17 tests this is a problem // could be related to: https://bugs.openjdk.org/browse/JDK-8081450 From 82e316ab614fcdc42204ab2b6db3a4ed9cc7224c Mon Sep 17 00:00:00 2001 From: Rahul Gidwani Date: Tue, 28 Mar 2023 10:23:15 -0700 Subject: [PATCH 13/14] Removing the un-needed code coverage happy test --- .../common/DruidKubernetesClientTest.java | 54 ------------------- 1 file changed, 54 deletions(-) delete mode 100644 extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidKubernetesClientTest.java diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidKubernetesClientTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidKubernetesClientTest.java deleted file mode 100644 index c8a157c1b51f..000000000000 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidKubernetesClientTest.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.k8s.overlord.common; - -import io.fabric8.kubernetes.client.Client; -import io.fabric8.kubernetes.client.Config; -import io.fabric8.kubernetes.client.KubernetesClient; -import org.junit.jupiter.api.Test; -import org.mockito.Mockito; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.when; - -class DruidKubernetesClientTest -{ - - @Test - void makingCodeCoverageHappy() - { - KubernetesClient client = mock(KubernetesClient.class); - when(client.getApiVersion()).thenReturn("foo"); - DruidKubernetesClient k8sClient = new DruidKubernetesClient(mock(Config.class)) - { - @Override - public KubernetesClient getClient() - { - return client; - } - }; - String version = k8sClient.executeRequest(Client::getApiVersion); - assertEquals("foo", version); - Mockito.verify(client, times(1)).close(); - - } -} From 28711503a9ae91a979cef0f852807157e781aebe Mon Sep 17 00:00:00 2001 From: Rahul Gidwani Date: Wed, 29 Mar 2023 10:58:58 -0700 Subject: [PATCH 14/14] Merge conflicts due to the PodTemplateTaskAdapter commit --- .../overlord/KubernetesTaskRunnerFactory.java | 1 + .../k8s/overlord/common/K8sTaskAdapter.java | 4 +-- .../overlord/common/K8sTaskAdapterTest.java | 35 +++++++++++++++---- .../common/MultiContainerTaskAdapterTest.java | 7 +++- 4 files changed, 38 insertions(+), 9 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java index a9589a1e565b..38cbeab2c729 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java @@ -24,6 +24,7 @@ import com.google.inject.Inject; import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.ConfigBuilder; +import org.apache.druid.guice.IndexingServiceModuleHelper; import org.apache.druid.guice.annotations.Self; import org.apache.druid.guice.annotations.Smile; import org.apache.druid.indexing.common.config.TaskConfig; diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapter.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapter.java index 380c89e85534..4bbecab7dd05 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapter.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapter.java @@ -206,11 +206,11 @@ void addEnvironmentVariables(Container mainContainer, PeonCommandContext context throws JsonProcessingException { // if the peon monitors are set, override the overlord's monitors (if set) with the peon monitors - if (!config.peonMonitors.isEmpty()) { + if (!taskRunnerConfig.peonMonitors.isEmpty()) { mainContainer.getEnv().removeIf(x -> "druid_monitoring_monitors".equals(x.getName())); mainContainer.getEnv().add(new EnvVarBuilder() .withName("druid_monitoring_monitors") - .withValue(mapper.writeValueAsString(config.peonMonitors)) + .withValue(mapper.writeValueAsString(taskRunnerConfig.peonMonitors)) .build()); } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapterTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapterTest.java index a7fdb2dbb5ec..cc6c6fec4362 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapterTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapterTest.java @@ -70,7 +70,7 @@ class K8sTaskAdapterTest private final StartupLoggingConfig startupLoggingConfig; private final TaskConfig taskConfig; private final DruidNode node; - private ObjectMapper jsonMapper; + private final ObjectMapper jsonMapper; public K8sTaskAdapterTest() { @@ -267,18 +267,34 @@ void testAddingMonitors() throws IOException ); KubernetesTaskRunnerConfig config = new KubernetesTaskRunnerConfig(); config.namespace = "test"; - K8sTaskAdapter adapter = new SingleContainerTaskAdapter(testClient, config, jsonMapper); + K8sTaskAdapter adapter = new SingleContainerTaskAdapter( + testClient, + config, + taskConfig, + startupLoggingConfig, + node, + jsonMapper + ); Task task = K8sTestUtils.getTask(); // no monitor in overlord, no monitor override Container container = new ContainerBuilder() .withName("container").build(); adapter.addEnvironmentVariables(container, context, task.toString()); - assertFalse(container.getEnv().stream().anyMatch(x -> x.getName().equals("druid_monitoring_monitors")), - "Didn't match, envs: " + Joiner.on(',').join(container.getEnv())); + assertFalse( + container.getEnv().stream().anyMatch(x -> x.getName().equals("druid_monitoring_monitors")), + "Didn't match, envs: " + Joiner.on(',').join(container.getEnv()) + ); // we have an override, but nothing in the overlord config.peonMonitors = jsonMapper.readValue("[\"org.apache.druid.java.util.metrics.JvmMonitor\"]", List.class); - adapter = new SingleContainerTaskAdapter(testClient, config, jsonMapper); + adapter = new SingleContainerTaskAdapter( + testClient, + config, + taskConfig, + startupLoggingConfig, + node, + jsonMapper + ); adapter.addEnvironmentVariables(container, context, task.toString()); EnvVar env = container.getEnv() .stream() @@ -289,7 +305,14 @@ void testAddingMonitors() throws IOException // we override what is in the overlord config.peonMonitors = jsonMapper.readValue("[\"org.apache.druid.java.util.metrics.JvmMonitor\"]", List.class); - adapter = new SingleContainerTaskAdapter(testClient, config, jsonMapper); + adapter = new SingleContainerTaskAdapter( + testClient, + config, + taskConfig, + startupLoggingConfig, + node, + jsonMapper + ); container.getEnv().add(new EnvVarBuilder() .withName("druid_monitoring_monitors") .withValue( diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/MultiContainerTaskAdapterTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/MultiContainerTaskAdapterTest.java index 51b4e7de6e10..06633bb4be5a 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/MultiContainerTaskAdapterTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/MultiContainerTaskAdapterTest.java @@ -201,7 +201,12 @@ public void testOverridingPeonMonitors() throws IOException config.namespace = "test"; config.primaryContainerName = "primary"; config.peonMonitors = jsonMapper.readValue("[\"org.apache.druid.java.util.metrics.JvmMonitor\"]", List.class); - MultiContainerTaskAdapter adapter = new MultiContainerTaskAdapter(testClient, config, jsonMapper); + MultiContainerTaskAdapter adapter = new MultiContainerTaskAdapter(testClient, + config, + taskConfig, + startupLoggingConfig, + druidNode, + jsonMapper); NoopTask task = NoopTask.create("id", 1); PodSpec spec = pod.getSpec(); K8sTaskAdapter.massageSpec(spec, config.primaryContainerName);