diff --git a/docs/development/extensions-contrib/k8s-jobs.md b/docs/development/extensions-contrib/k8s-jobs.md
index e3bf019cc94e..5cbf4c507be8 100644
--- a/docs/development/extensions-contrib/k8s-jobs.md
+++ b/docs/development/extensions-contrib/k8s-jobs.md
@@ -81,6 +81,7 @@ Additional Configuration
|`druid.indexer.runner.javaOptsArray`|`JsonArray`|java opts for the task.|`-Xmx1g`|No|
|`druid.indexer.runner.labels`|`JsonObject`|Additional labels you want to add to peon pod|`{}`|No|
|`druid.indexer.runner.annotations`|`JsonObject`|Additional annotations you want to add to peon pod|`{}`|No|
+|`druid.indexer.runner.peonMonitors`|`JsonArray`|Overrides `druid.monitoring.monitors`. Use this property if you don't want to inherit monitors from the Overlord.|`[]`|No|
|`druid.indexer.runner.graceTerminationPeriodSeconds`|`Long`|Number of seconds you want to wait after a sigterm for container lifecycle hooks to complete. Keep at a smaller value if you want tasks to hold locks for shorter periods.|`PT30S` (K8s default)|No|
### Gotchas
diff --git a/extensions-contrib/kubernetes-overlord-extensions/pom.xml b/extensions-contrib/kubernetes-overlord-extensions/pom.xml
index f2a775ec2db4..62b33bea4e40 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/pom.xml
+++ b/extensions-contrib/kubernetes-overlord-extensions/pom.xml
@@ -98,7 +98,7 @@
io.fabric8
kubernetes-model-core
- 5.12.2
+ 6.4.1
javax.validation
@@ -108,12 +108,18 @@
io.fabric8
kubernetes-model-batch
- 5.12.2
+ 6.4.1
+
+
+ io.fabric8
+ kubernetes-client-api
+ 6.4.1
io.fabric8
kubernetes-client
- 5.12.2
+ 6.4.1
+ runtime
@@ -130,7 +136,7 @@
io.fabric8
kubernetes-server-mock
- 5.12.2
+ 6.4.1
test
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
index ae2dbbb885a5..1a9a9a3b4cbf 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
@@ -172,7 +172,7 @@ public ListenableFuture run(Task task)
} else {
status = TaskStatus.failure(
task.getId(),
- "Task failed %s: " + k8sTaskId
+ "Task failed: " + k8sTaskId
);
}
if (completedPhase.getJobDuration().isPresent()) {
@@ -229,6 +229,7 @@ JobResponse monitorJob(Pod peonPod, K8sTaskId k8sTaskId)
@Override
public void updateStatus(Task task, TaskStatus status)
{
+ log.info("Updating task: %s with status %s", task.getId(), status);
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status);
}
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java
index c8f975dbbd20..d83b81b15965 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java
@@ -24,6 +24,7 @@
import org.joda.time.Period;
import javax.validation.constraints.NotNull;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -87,9 +88,16 @@ public class KubernetesTaskRunnerConfig
// how long to wait for the peon k8s job to launch
public Period k8sjobLaunchTimeout = new Period("PT1H");
+ @JsonProperty
+ // ForkingTaskRunner inherits the monitors from the MM, in k8s mode
+ // the peon inherits the monitors from the overlord, so if someone specifies
+ // a TaskCountStatsMonitor in the overlord for example, the peon process
+ // fails because it can not inject this monitor in the peon process.
+ public List peonMonitors = new ArrayList<>();
+
@JsonProperty
@NotNull
- public List javaOptsArray;
+ public List javaOptsArray = new ArrayList<>();
@JsonProperty
@NotNull
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
index e86e1700aaaa..38cbeab2c729 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
@@ -23,6 +23,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import io.fabric8.kubernetes.client.Config;
+import io.fabric8.kubernetes.client.ConfigBuilder;
import org.apache.druid.guice.IndexingServiceModuleHelper;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.guice.annotations.Smile;
@@ -85,7 +86,7 @@ public KubernetesTaskRunner build()
{
DruidKubernetesClient client;
if (kubernetesTaskRunnerConfig.disableClientProxy) {
- Config config = Config.autoConfigure(null);
+ Config config = new ConfigBuilder().build();
config.setHttpsProxy(null);
config.setHttpProxy(null);
client = new DruidKubernetesClient(config);
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesClient.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesClient.java
index 6bb297264d66..c39ce9198bb4 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesClient.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesClient.java
@@ -20,8 +20,9 @@
package org.apache.druid.k8s.overlord.common;
import io.fabric8.kubernetes.client.Config;
-import io.fabric8.kubernetes.client.DefaultKubernetesClient;
+import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientBuilder;
public class DruidKubernetesClient implements KubernetesClientApi
{
@@ -30,7 +31,7 @@ public class DruidKubernetesClient implements KubernetesClientApi
public DruidKubernetesClient()
{
- this(Config.autoConfigure(null));
+ this(new ConfigBuilder().build());
}
public DruidKubernetesClient(Config config)
@@ -41,8 +42,14 @@ public DruidKubernetesClient(Config config)
@Override
public T executeRequest(KubernetesExecutor executor) throws KubernetesResourceNotFoundException
{
- try (KubernetesClient client = new DefaultKubernetesClient(config)) {
+ try (KubernetesClient client = getClient()) {
return executor.executeRequest(client);
}
}
+
+ @Override
+ public KubernetesClient getClient()
+ {
+ return new KubernetesClientBuilder().withConfig(config).build();
+ }
}
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java
index 226121d5a5bb..812b34908ef2 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java
@@ -25,13 +25,11 @@
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.client.KubernetesClient;
-import org.apache.commons.io.input.ReaderInputStream;
+import io.fabric8.kubernetes.client.dsl.LogWatch;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import java.io.InputStream;
-import java.io.Reader;
-import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
@@ -78,7 +76,7 @@ public Pod launchJobAndWaitForStart(Job job, long howLong, TimeUnit timeUnit)
long start = System.currentTimeMillis();
// launch job
return clientApi.executeRequest(client -> {
- client.batch().v1().jobs().inNamespace(namespace).create(job);
+ client.batch().v1().jobs().inNamespace(namespace).resource(job).create();
K8sTaskId taskId = new K8sTaskId(job.getMetadata().getName());
log.info("Successfully submitted job: %s ... waiting for job to launch", taskId);
// wait until the pod is running or complete or failed, any of those is fine
@@ -106,13 +104,15 @@ public JobResponse waitForJobCompletion(K8sTaskId taskId, long howLong, TimeUnit
.inNamespace(namespace)
.withName(taskId.getK8sTaskId())
.waitUntilCondition(
- x -> x != null && x.getStatus() != null && x.getStatus().getActive() == null,
+ x -> x != null && x.getStatus() != null && x.getStatus().getActive() == null
+ && (x.getStatus().getFailed() != null || x.getStatus().getSucceeded() !=null),
howLong,
unit
);
if (job.getStatus().getSucceeded() != null) {
return new JobResponse(job, PeonPhase.SUCCEEDED);
}
+ log.warn("Task %s failed with status %s", taskId, job.getStatus());
return new JobResponse(job, PeonPhase.FAILED);
});
}
@@ -121,12 +121,12 @@ public JobResponse waitForJobCompletion(K8sTaskId taskId, long howLong, TimeUnit
public boolean cleanUpJob(K8sTaskId taskId)
{
if (!debugJobs) {
- Boolean result = clientApi.executeRequest(client -> client.batch()
- .v1()
- .jobs()
- .inNamespace(namespace)
- .withName(taskId.getK8sTaskId())
- .delete());
+ Boolean result = clientApi.executeRequest(client -> !client.batch()
+ .v1()
+ .jobs()
+ .inNamespace(namespace)
+ .withName(taskId.getK8sTaskId())
+ .delete().isEmpty());
if (result) {
log.info("Cleaned up k8s task: %s", taskId);
} else {
@@ -143,23 +143,24 @@ public boolean cleanUpJob(K8sTaskId taskId)
@Override
public Optional getPeonLogs(K8sTaskId taskId)
{
+ KubernetesClient k8sClient = clientApi.getClient();
try {
- return clientApi.executeRequest(client -> {
- Reader reader = client.batch()
- .v1()
- .jobs()
- .inNamespace(namespace)
- .withName(taskId.getK8sTaskId())
- .inContainer("main")
- .getLogReader();
- if (reader == null) {
- return Optional.absent();
- }
- return Optional.of(new ReaderInputStream(reader, StandardCharsets.UTF_8));
- });
+ LogWatch logWatch = k8sClient.batch()
+ .v1()
+ .jobs()
+ .inNamespace(namespace)
+ .withName(taskId.getK8sTaskId())
+ .inContainer("main")
+ .watchLog();
+ if (logWatch == null) {
+ k8sClient.close();
+ return Optional.absent();
+ }
+ return Optional.of(new LogWatchInputStream(k8sClient, logWatch));
}
catch (Exception e) {
log.error(e, "Error streaming logs from task: %s", taskId);
+ k8sClient.close();
return Optional.absent();
}
}
@@ -180,17 +181,17 @@ public List listAllPeonJobs()
public List listPeonPods(Set phases)
{
return listPeonPods().stream()
- .filter(x -> phases.contains(PeonPhase.getPhaseFor(x)))
- .collect(Collectors.toList());
+ .filter(x -> phases.contains(PeonPhase.getPhaseFor(x)))
+ .collect(Collectors.toList());
}
@Override
public List listPeonPods()
{
- PodList podList = clientApi.executeRequest(client -> client.pods().inNamespace(namespace))
- .withLabel(DruidK8sConstants.LABEL_KEY)
- .list();
- return podList.getItems();
+ return clientApi.executeRequest(client -> client.pods().inNamespace(namespace)
+ .withLabel(DruidK8sConstants.LABEL_KEY)
+ .list().getItems());
+
}
@Override
@@ -200,7 +201,12 @@ public int cleanCompletedJobsOlderThan(long howFarBack, TimeUnit timeUnit)
return clientApi.executeRequest(client -> {
List jobs = getJobsToCleanup(listAllPeonJobs(), howFarBack, timeUnit);
jobs.forEach(x -> {
- if (client.batch().v1().jobs().inNamespace(namespace).withName(x.getMetadata().getName()).delete()) {
+ if (!client.batch()
+ .v1()
+ .jobs()
+ .inNamespace(namespace)
+ .withName(x.getMetadata().getName())
+ .delete().isEmpty()) {
numDeleted.incrementAndGet();
}
});
@@ -254,5 +260,4 @@ Pod getMainJobPod(KubernetesClient client, K8sTaskId taskId)
throw new KubernetesResourceNotFoundException("K8s pod with label: job-name=" + k8sTaskId + " not found");
}
}
-
}
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapter.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapter.java
index bfc87d7c8676..4bbecab7dd05 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapter.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapter.java
@@ -19,6 +19,7 @@
package org.apache.druid.k8s.overlord.common;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
@@ -119,6 +120,7 @@ public Job fromTask(Task task) throws IOException
@Override
public Task toTask(Pod from) throws IOException
{
+ // all i have to do here is grab the main container...done
PodSpec podSpec = from.getSpec();
massageSpec(podSpec, "main");
List envVars = podSpec.getContainers().get(0).getEnv();
@@ -199,8 +201,19 @@ protected void setupPorts(Container mainContainer)
mainContainer.setPorts(Lists.newArrayList(httpsPort, tcpPort));
}
- protected void addEnvironmentVariables(Container mainContainer, PeonCommandContext context, String taskContents)
+ @VisibleForTesting
+ void addEnvironmentVariables(Container mainContainer, PeonCommandContext context, String taskContents)
+ throws JsonProcessingException
{
+ // if the peon monitors are set, override the overlord's monitors (if set) with the peon monitors
+ if (!taskRunnerConfig.peonMonitors.isEmpty()) {
+ mainContainer.getEnv().removeIf(x -> "druid_monitoring_monitors".equals(x.getName()));
+ mainContainer.getEnv().add(new EnvVarBuilder()
+ .withName("druid_monitoring_monitors")
+ .withValue(mapper.writeValueAsString(taskRunnerConfig.peonMonitors))
+ .build());
+ }
+
mainContainer.getEnv().addAll(Lists.newArrayList(
new EnvVarBuilder()
.withName(DruidK8sConstants.TASK_DIR_ENV)
@@ -234,7 +247,7 @@ protected Container setupMainContainer(
PeonCommandContext context,
long containerSize,
String taskContents
- )
+ ) throws JsonProcessingException
{
// prepend the startup task.json extraction command
List mainCommand = Lists.newArrayList("sh", "-c");
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesClientApi.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesClientApi.java
index 44b0464fcfa8..655e4435f205 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesClientApi.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesClientApi.java
@@ -19,8 +19,15 @@
package org.apache.druid.k8s.overlord.common;
+import io.fabric8.kubernetes.client.KubernetesClient;
+
// Wraps all kubernetes api calls, to ensure you open and close connections properly
public interface KubernetesClientApi
{
T executeRequest(KubernetesExecutor executor) throws KubernetesResourceNotFoundException;
+
+ // use only when handling streams of data, example if you want to pass around an input stream from a pod,
+ // then you would call this instead of executeRequest as you would want to keep the connection open until you
+ // are done with the stream. Callers responsibility to clean up when using this method
+ KubernetesClient getClient();
}
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/LogWatchInputStream.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/LogWatchInputStream.java
new file mode 100644
index 000000000000..3665e940f349
--- /dev/null
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/LogWatchInputStream.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.common;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.dsl.LogWatch;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * This wraps the InputStream for k8s client
+ * When you call close on the stream, it will also close the open
+ * http connections and the client
+ */
+public class LogWatchInputStream extends InputStream
+{
+
+ private final KubernetesClient client;
+ private final LogWatch logWatch;
+
+ public LogWatchInputStream(KubernetesClient client, LogWatch logWatch)
+ {
+ this.client = client;
+ this.logWatch = logWatch;
+ }
+
+ @Override
+ public int read() throws IOException
+ {
+ return logWatch.getOutput().read();
+ }
+
+ @Override
+ public void close()
+ {
+ logWatch.close();
+ client.close();
+ }
+}
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidPeonClientIntegrationTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidPeonClientIntegrationTest.java
index 72cd8d41ec66..f82a82d78b41 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidPeonClientIntegrationTest.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidPeonClientIntegrationTest.java
@@ -59,6 +59,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
// must have a kind / minikube cluster installed and the image pushed to your repository
+@Disabled
public class DruidPeonClientIntegrationTest
{
private StartupLoggingConfig startupLoggingConfig;
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapterTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapterTest.java
index fe9775868edd..cc6c6fec4362 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapterTest.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapterTest.java
@@ -22,11 +22,14 @@
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.google.api.client.util.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ContainerBuilder;
+import io.fabric8.kubernetes.api.model.EnvVar;
+import io.fabric8.kubernetes.api.model.EnvVarBuilder;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.PodSpec;
@@ -67,7 +70,7 @@ class K8sTaskAdapterTest
private final StartupLoggingConfig startupLoggingConfig;
private final TaskConfig taskConfig;
private final DruidNode node;
- private ObjectMapper jsonMapper;
+ private final ObjectMapper jsonMapper;
public K8sTaskAdapterTest()
{
@@ -253,4 +256,75 @@ void testNoPrimaryFound()
});
}
+ @Test
+ void testAddingMonitors() throws IOException
+ {
+ TestKubernetesClient testClient = new TestKubernetesClient(client);
+ PeonCommandContext context = new PeonCommandContext(
+ new ArrayList<>(),
+ new ArrayList<>(),
+ new File("/tmp/")
+ );
+ KubernetesTaskRunnerConfig config = new KubernetesTaskRunnerConfig();
+ config.namespace = "test";
+ K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
+ testClient,
+ config,
+ taskConfig,
+ startupLoggingConfig,
+ node,
+ jsonMapper
+ );
+ Task task = K8sTestUtils.getTask();
+ // no monitor in overlord, no monitor override
+ Container container = new ContainerBuilder()
+ .withName("container").build();
+ adapter.addEnvironmentVariables(container, context, task.toString());
+ assertFalse(
+ container.getEnv().stream().anyMatch(x -> x.getName().equals("druid_monitoring_monitors")),
+ "Didn't match, envs: " + Joiner.on(',').join(container.getEnv())
+ );
+
+ // we have an override, but nothing in the overlord
+ config.peonMonitors = jsonMapper.readValue("[\"org.apache.druid.java.util.metrics.JvmMonitor\"]", List.class);
+ adapter = new SingleContainerTaskAdapter(
+ testClient,
+ config,
+ taskConfig,
+ startupLoggingConfig,
+ node,
+ jsonMapper
+ );
+ adapter.addEnvironmentVariables(container, context, task.toString());
+ EnvVar env = container.getEnv()
+ .stream()
+ .filter(x -> x.getName().equals("druid_monitoring_monitors"))
+ .findFirst()
+ .get();
+ assertEquals(jsonMapper.writeValueAsString(config.peonMonitors), env.getValue());
+
+ // we override what is in the overlord
+ config.peonMonitors = jsonMapper.readValue("[\"org.apache.druid.java.util.metrics.JvmMonitor\"]", List.class);
+ adapter = new SingleContainerTaskAdapter(
+ testClient,
+ config,
+ taskConfig,
+ startupLoggingConfig,
+ node,
+ jsonMapper
+ );
+ container.getEnv().add(new EnvVarBuilder()
+ .withName("druid_monitoring_monitors")
+ .withValue(
+ "'[\"org.apache.druid.java.util.metrics.JvmMonitor\", "
+ + "\"org.apache.druid.server.metrics.TaskCountStatsMonitor\"]'")
+ .build());
+ adapter.addEnvironmentVariables(container, context, task.toString());
+ env = container.getEnv()
+ .stream()
+ .filter(x -> x.getName().equals("druid_monitoring_monitors"))
+ .findFirst()
+ .get();
+ assertEquals(jsonMapper.writeValueAsString(config.peonMonitors), env.getValue());
+ }
}
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTestUtils.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTestUtils.java
index 0f454102986d..503708f77430 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTestUtils.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTestUtils.java
@@ -27,6 +27,7 @@
import io.fabric8.kubernetes.api.model.PodSpecBuilder;
import io.fabric8.kubernetes.api.model.PodTemplateSpec;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import io.fabric8.kubernetes.client.utils.Serialization;
import org.apache.commons.text.CharacterPredicates;
import org.apache.commons.text.RandomStringGenerator;
import org.apache.druid.data.input.impl.DimensionsSpec;
@@ -142,4 +143,11 @@ public static Task getTask()
);
}
+ public static T fileToResource(String contents, Class type)
+ {
+ return Serialization.unmarshal(
+ MultiContainerTaskAdapter.class.getClassLoader().getResourceAsStream(contents),
+ type
+ );
+ }
}
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/LogWatchInputStreamTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/LogWatchInputStreamTest.java
new file mode 100644
index 000000000000..ffb60fda99f2
--- /dev/null
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/LogWatchInputStreamTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.common;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.dsl.LogWatch;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+class LogWatchInputStreamTest
+{
+
+ @Test
+ void testFlow() throws IOException
+ {
+ LogWatch logWatch = mock(LogWatch.class);
+ InputStream inputStream = mock(InputStream.class);
+ when(inputStream.read()).thenReturn(1);
+ when(logWatch.getOutput()).thenReturn(inputStream);
+ KubernetesClient client = mock(KubernetesClient.class);
+ LogWatchInputStream stream = new LogWatchInputStream(client, logWatch);
+ int result = stream.read();
+ Assertions.assertEquals(1, result);
+ verify(inputStream, times(1)).read();
+ stream.close();
+ verify(logWatch, times(1)).close();
+ verify(client, times(1)).close();
+ }
+}
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/MultiContainerTaskAdapterTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/MultiContainerTaskAdapterTest.java
index cba40518731a..06633bb4be5a 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/MultiContainerTaskAdapterTest.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/MultiContainerTaskAdapterTest.java
@@ -45,6 +45,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
@EnableKubernetesMockClient(crud = true)
class MultiContainerTaskAdapterTest
@@ -100,7 +101,7 @@ public void setup()
public void testMultiContainerSupport() throws IOException
{
TestKubernetesClient testClient = new TestKubernetesClient(client);
- Pod pod = client.pods().load(this.getClass().getClassLoader().getResourceAsStream("multiContainerPodSpec.yaml")).get();
+ Pod pod = K8sTestUtils.fileToResource("multiContainerPodSpec.yaml", Pod.class);
KubernetesTaskRunnerConfig config = new KubernetesTaskRunnerConfig();
config.namespace = "test";
MultiContainerTaskAdapter adapter = new MultiContainerTaskAdapter(
@@ -120,11 +121,7 @@ public void testMultiContainerSupport() throws IOException
new File("/tmp")
)
);
- Job expected = client.batch()
- .v1()
- .jobs()
- .load(this.getClass().getClassLoader().getResourceAsStream("expectedMultiContainerOutput.yaml"))
- .get();
+ Job expected = K8sTestUtils.fileToResource("expectedMultiContainerOutput.yaml", Job.class);
// something is up with jdk 17, where if you compress with jdk < 17 and try and decompress you get different results,
// this would never happen in real life, but for the jdk 17 tests this is a problem
@@ -150,7 +147,7 @@ public void testMultiContainerSupport() throws IOException
public void testMultiContainerSupportWithNamedContainer() throws IOException
{
TestKubernetesClient testClient = new TestKubernetesClient(client);
- Pod pod = client.pods().load(this.getClass().getClassLoader().getResourceAsStream("multiContainerPodSpecOrder.yaml")).get();
+ Pod pod = K8sTestUtils.fileToResource("multiContainerPodSpecOrder.yaml", Pod.class);
KubernetesTaskRunnerConfig config = new KubernetesTaskRunnerConfig();
config.namespace = "test";
config.primaryContainerName = "primary";
@@ -166,29 +163,73 @@ public void testMultiContainerSupportWithNamedContainer() throws IOException
PodSpec spec = pod.getSpec();
K8sTaskAdapter.massageSpec(spec, "primary");
Job actual = adapter.createJobFromPodSpec(
- spec,
- task,
- new PeonCommandContext(Collections.singletonList("/peon.sh /druid/data/baseTaskDir/noop_2022-09-26T22:08:00.582Z_352988d2-5ff7-4b70-977c-3de96f9bfca6 1"),
- new ArrayList<>(),
- new File("/tmp")
- )
+ spec,
+ task,
+ new PeonCommandContext(Collections.singletonList("/peon.sh /druid/data/baseTaskDir/noop_2022-09-26T22:08:00.582Z_352988d2-5ff7-4b70-977c-3de96f9bfca6 1"),
+ new ArrayList<>(),
+ new File("/tmp")
+ )
);
- Job expected = client.batch()
- .v1()
- .jobs()
- .load(this.getClass().getClassLoader().getResourceAsStream("expectedMultiContainerOutputOrder.yaml"))
- .get();
+ Job expected = K8sTestUtils.fileToResource("expectedMultiContainerOutputOrder.yaml", Job.class);
// something is up with jdk 17, where if you compress with jdk < 17 and try and decompress you get different results,
// this would never happen in real life, but for the jdk 17 tests this is a problem
// could be related to: https://bugs.openjdk.org/browse/JDK-8081450
actual.getSpec()
+ .getTemplate()
+ .getSpec()
+ .getContainers()
+ .get(0)
+ .getEnv()
+ .removeIf(x -> x.getName().equals("TASK_JSON"));
+ expected.getSpec()
.getTemplate()
.getSpec()
.getContainers()
.get(0)
.getEnv()
.removeIf(x -> x.getName().equals("TASK_JSON"));
+ Assertions.assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testOverridingPeonMonitors() throws IOException
+ {
+ TestKubernetesClient testClient = new TestKubernetesClient(client);
+ Pod pod = K8sTestUtils.fileToResource("podSpec.yaml", Pod.class);
+ KubernetesTaskRunnerConfig config = new KubernetesTaskRunnerConfig();
+ config.namespace = "test";
+ config.primaryContainerName = "primary";
+ config.peonMonitors = jsonMapper.readValue("[\"org.apache.druid.java.util.metrics.JvmMonitor\"]", List.class);
+ MultiContainerTaskAdapter adapter = new MultiContainerTaskAdapter(testClient,
+ config,
+ taskConfig,
+ startupLoggingConfig,
+ druidNode,
+ jsonMapper);
+ NoopTask task = NoopTask.create("id", 1);
+ PodSpec spec = pod.getSpec();
+ K8sTaskAdapter.massageSpec(spec, config.primaryContainerName);
+ Job actual = adapter.createJobFromPodSpec(
+ spec,
+ task,
+ new PeonCommandContext(Collections.singletonList("/peon.sh /druid/data/baseTaskDir/noop_2022-09-26T22:08:00.582Z_352988d2-5ff7-4b70-977c-3de96f9bfca6 1"),
+ new ArrayList<>(),
+ new File("/tmp")
+ )
+ );
+ Job expected = K8sTestUtils.fileToResource("expectedPodSpec.yaml", Job.class);
+
+ // something is up with jdk 17, where if you compress with jdk < 17 and try and decompress you get different results,
+ // this would never happen in real life, but for the jdk 17 tests this is a problem
+ // could be related to: https://bugs.openjdk.org/browse/JDK-8081450
+ actual.getSpec()
+ .getTemplate()
+ .getSpec()
+ .getContainers()
+ .get(0)
+ .getEnv()
+ .removeIf(x -> x.getName().equals("TASK_JSON"));
expected.getSpec()
.getTemplate()
.getSpec()
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/SingleContainerTaskAdapterTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/SingleContainerTaskAdapterTest.java
index 9005aa086ae1..c165feded650 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/SingleContainerTaskAdapterTest.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/SingleContainerTaskAdapterTest.java
@@ -99,9 +99,7 @@ public void setup()
public void testSingleContainerSupport() throws IOException
{
TestKubernetesClient testClient = new TestKubernetesClient(client);
- Pod pod = client.pods()
- .load(this.getClass().getClassLoader().getResourceAsStream("multiContainerPodSpec.yaml"))
- .get();
+ Pod pod = K8sTestUtils.fileToResource("multiContainerPodSpec.yaml", Pod.class);
KubernetesTaskRunnerConfig config = new KubernetesTaskRunnerConfig();
config.namespace = "test";
SingleContainerTaskAdapter adapter = new SingleContainerTaskAdapter(
@@ -122,13 +120,8 @@ public void testSingleContainerSupport() throws IOException
new File("/tmp")
)
);
- Job expected = client.batch()
- .v1()
- .jobs()
- .load(this.getClass()
- .getClassLoader()
- .getResourceAsStream("expectedSingleContainerOutput.yaml"))
- .get();
+
+ Job expected = K8sTestUtils.fileToResource("expectedSingleContainerOutput.yaml", Job.class);
// something is up with jdk 17, where if you compress with jdk < 17 and try and decompress you get different results,
// this would never happen in real life, but for the jdk 17 tests this is a problem
// could be related to: https://bugs.openjdk.org/browse/JDK-8081450
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/TestKubernetesClient.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/TestKubernetesClient.java
index 2cd1538cd727..57be98251a9a 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/TestKubernetesClient.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/TestKubernetesClient.java
@@ -36,4 +36,10 @@ public T executeRequest(KubernetesExecutor executor) throws KubernetesRes
{
return executor.executeRequest(client);
}
+
+ @Override
+ public KubernetesClient getClient()
+ {
+ return client;
+ }
}
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedPodSpec.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedPodSpec.yaml
index 9b2b6a8e1f17..939879a4af2c 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedPodSpec.yaml
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedPodSpec.yaml
@@ -2,29 +2,50 @@ apiVersion: "batch/v1"
kind: "Job"
metadata:
annotations:
- task.id: "task_id"
+ task.id: "id"
tls.enabled: "false"
labels:
druid.k8s.peons: "true"
- name: "taskid"
+ name: "id"
spec:
- activeDeadlineSeconds: 3600
+ activeDeadlineSeconds: 14400
backoffLimit: 0
template:
metadata:
annotations:
- task.id: "task_id"
+ task.id: "id"
tls.enabled: "false"
labels:
druid.k8s.peons: "true"
spec:
containers:
- args:
- - "trap 'touch /usr/share/pod/done' EXIT; mkdir -p ${TASK_DIR}; echo ${TASK_JSON}\
- \ | base64 -d | gzip -d > ${TASK_DIR}/task.json; "
+ - "/kubexit/kubexit /bin/sh -c \"/peon.sh /druid/data/baseTaskDir/noop_2022-09-26T22:08:00.582Z_352988d2-5ff7-4b70-977c-3de96f9bfca6\
+ \ 1\""
command:
- - "sh"
+ - "/bin/sh"
- "-c"
+ env:
+ - name: "druid_monitoring_monitors"
+ value: '["org.apache.druid.java.util.metrics.JvmMonitor"]'
+ - name: "TASK_DIR"
+ value: "/tmp"
+ - name: "TASK_JSON"
+ value: "H4sIAAAAAAAAAEVOOw7CMAy9i+cOBYmlK0KItWVhNI0BSyEOToKoqt4doxZYLPv9/EbIQyRoIIhEqICd7TYquKqUePidDjN2UrSfxYEM0xKOfDdgvalr86aW0A0z9L9bSsVnc512nZkurHSTZJJQvK+gl5DpZfwIUVmU8wDNarJ0Ssu/EfCJ7PHM3tj9p9i3ltKjWKDbYsR+sU5vP86oMNUAAAA="
+ - name: "JAVA_OPTS"
+ value: ""
+ - name: "druid_host"
+ valueFrom:
+ fieldRef:
+ fieldPath: "status.podIP"
+ - name: "HOSTNAME"
+ valueFrom:
+ fieldRef:
+ fieldPath: "metadata.name"
+ - name: "KUBEXIT_NAME"
+ value: "main"
+ - name: "KUBEXIT_GRAVEYARD"
+ value: "/graveyard"
image: "one"
name: "main"
ports:
@@ -37,30 +58,50 @@ spec:
resources:
limits:
cpu: "1000m"
- memory: "1000000000"
+ memory: "2400000000"
requests:
cpu: "1000m"
- memory: "1000000000"
+ memory: "2400000000"
volumeMounts:
- - mountPath: "/usr/share/pod"
- name: "peon-share"
- - command:
- - "tail -f /dev/null"
+ - mountPath: "/graveyard"
+ name: "graveyard"
+ - mountPath: "/kubexit"
+ name: "kubexit"
+ - args:
+ - "/kubexit/kubexit /bin/sh -c \"tail -f /dev/null\" || true"
+ command:
+ - "/bin/sh"
+ - "-c"
+ env:
+ - name: "KUBEXIT_NAME"
+ value: "sidecar"
+ - name: "KUBEXIT_GRAVEYARD"
+ value: "/graveyard"
+ - name: "KUBEXIT_DEATH_DEPS"
+ value: "main"
image: "two"
- lifecycle:
- postStart:
- exec:
- command:
- - "while ! test -f /usr/share/pod/done; do echo 'Waiting for the main\
- \ pod to finish...'; sleep 5; done; echo 'Agent pod finished, exiting';\
- \ exit 0"
name: "sidecar"
volumeMounts:
- - mountPath: "/usr/share/pod"
- name: "peon-share"
- readOnly: true
+ - mountPath: "/graveyard"
+ name: "graveyard"
+ - mountPath: "/kubexit"
+ name: "kubexit"
+ hostname: "id"
+ initContainers:
+ - command:
+ - "cp"
+ - "/bin/kubexit"
+ - "/kubexit/kubexit"
+ image: "karlkfi/kubexit:v0.3.2"
+ name: "kubexit"
+ volumeMounts:
+ - mountPath: "/kubexit"
+ name: "kubexit"
restartPolicy: "Never"
volumes:
+ - emptyDir:
+ medium: "Memory"
+ name: "graveyard"
- emptyDir: {}
- name: "peon-share"
- ttlSecondsAfterFinished: 7200
\ No newline at end of file
+ name: "kubexit"
+ ttlSecondsAfterFinished: 172800
diff --git a/licenses.yaml b/licenses.yaml
index 6b121c715a5b..9136269e05f7 100644
--- a/licenses.yaml
+++ b/licenses.yaml
@@ -901,7 +901,7 @@ name: kubernetes fabric java client
license_category: binary
module: extensions-contrib/kubernetes-overlord-extensions
license_name: Apache License version 2.0
-version: 5.12.2
+version: 6.4.1
libraries:
- io.fabric8: kubernetes-client