Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
b7f8422
Upgrade the fabric client to support newer versions of k8s, add abili…
churromorales Feb 14, 2023
ae4248f
Fix dependencies check in build
churromorales Feb 14, 2023
c77cb94
Have to remove the concrete dependency and keep the api
churromorales Feb 14, 2023
922afaa
Forgot to update the expected file in this commit
churromorales Feb 14, 2023
e655777
Adding code coverage, fixing some inspections
churromorales Feb 15, 2023
9d14e46
Made a mistake on which mocks to call verify on
churromorales Feb 15, 2023
9be2a23
Have to add the client back as a runtime dependency, otherwise we get…
churromorales Feb 15, 2023
4d76e5f
Last change and some log fixes to work with k8s 1.25
churromorales Feb 17, 2023
1e97768
Update docs/development/extensions-contrib/k8s-jobs.md
churromorales Feb 28, 2023
55875f2
Changes per PR review
churromorales Feb 28, 2023
4b836f3
Merge branch 'master' into k8s-mm-less-fixes
churromorales Mar 10, 2023
4e8bb9d
Update docs/development/extensions-contrib/k8s-jobs.md
churromorales Mar 10, 2023
c28d260
Small fix to make the config match the docs and fixed a typo in the t…
churromorales Mar 13, 2023
79c1e6b
Merge branch 'master' into k8s-mm-less-fixes
churromorales Mar 28, 2023
82e316a
Removing the un-needed code coverage happy test
churromorales Mar 28, 2023
2871150
Merge conflicts due to the PodTemplateTaskAdapter commit
churromorales Mar 29, 2023
de4378d
Merge remote-tracking branch 'upstream/master' into k8s-mm-less-fixes
Apr 4, 2023
12cce38
Fabric8 upgrade for PodTemplateTaskAdapter
Apr 4, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/development/extensions-contrib/k8s-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ Additional Configuration
|`druid.indexer.runner.javaOptsArray`|`JsonArray`|java opts for the task.|`-Xmx1g`|No|
|`druid.indexer.runner.labels`|`JsonObject`|Additional labels you want to add to peon pod|`{}`|No|
|`druid.indexer.runner.annotations`|`JsonObject`|Additional annotations you want to add to peon pod|`{}`|No|
|`druid.indexer.runner.peonMonitors`|`JsonArray`|Overrides `druid.monitoring.monitors`. Use this property if you don't want to inherit monitors from the Overlord.|`[]`|No|
|`druid.indexer.runner.graceTerminationPeriodSeconds`|`Long`|Number of seconds you want to wait after a sigterm for container lifecycle hooks to complete. Keep at a smaller value if you want tasks to hold locks for shorter periods.|`PT30S` (K8s default)|No|

### Gotchas
Expand Down
14 changes: 10 additions & 4 deletions extensions-contrib/kubernetes-overlord-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-model-core</artifactId>
<version>5.12.2</version>
<version>6.4.1</version>
</dependency>
<dependency>
<groupId>javax.validation</groupId>
Expand All @@ -114,12 +114,18 @@
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-model-batch</artifactId>
<version>5.12.2</version>
<version>6.4.1</version>
</dependency>
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client-api</artifactId>
<version>6.4.1</version>
</dependency>
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
<version>5.12.2</version>
<version>6.4.1</version>
<scope>runtime</scope>
</dependency>

<!-- Tests -->
Expand All @@ -136,7 +142,7 @@
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-server-mock</artifactId>
<version>5.12.2</version>
<version>6.4.1</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public ListenableFuture<TaskStatus> run(Task task)
} else {
status = TaskStatus.failure(
task.getId(),
"Task failed %s: " + k8sTaskId
"Task failed: " + k8sTaskId
);
}
if (completedPhase.getJobDuration().isPresent()) {
Expand Down Expand Up @@ -240,6 +240,7 @@ JobResponse monitorJob(Pod peonPod, K8sTaskId k8sTaskId)
@Override
public void updateStatus(Task task, TaskStatus status)
{
log.info("Updating task: %s with status %s", task.getId(), status);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this log line left in on purpose?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was part of the original PR, IMO this should be a debug log.

TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.joda.time.Period;

import javax.validation.constraints.NotNull;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -87,9 +88,16 @@ public class KubernetesTaskRunnerConfig
// how long to wait for the peon k8s job to launch
public Period k8sjobLaunchTimeout = new Period("PT1H");

@JsonProperty
// ForkingTaskRunner inherits the monitors from the MM, in k8s mode
// the peon inherits the monitors from the overlord, so if someone specifies
// a TaskCountStatsMonitor in the overlord for example, the peon process
// fails because it can not inject this monitor in the peon process.
public List<String> peonMonitors = new ArrayList<>();

@JsonProperty
@NotNull
public List<String> javaOptsArray;
public List<String> javaOptsArray = new ArrayList<>();

@JsonProperty
@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import org.apache.druid.guice.IndexingServiceModuleHelper;
import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.guice.annotations.Self;
Expand Down Expand Up @@ -90,7 +91,7 @@ public KubernetesTaskRunner build()
{
DruidKubernetesClient client;
if (kubernetesTaskRunnerConfig.disableClientProxy) {
Config config = Config.autoConfigure(null);
Config config = new ConfigBuilder().build();
config.setHttpsProxy(null);
config.setHttpProxy(null);
client = new DruidKubernetesClient(config);
Expand Down Expand Up @@ -143,7 +144,6 @@ private TaskAdapter buildTaskAdapter(DruidKubernetesClient client)
);
} else if (PodTemplateTaskAdapter.TYPE.equals(adapter)) {
return new PodTemplateTaskAdapter(
client,
kubernetesTaskRunnerConfig,
taskConfig,
druidNode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
package org.apache.druid.k8s.overlord.common;

import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientBuilder;

public class DruidKubernetesClient implements KubernetesClientApi
{
Expand All @@ -30,7 +31,7 @@ public class DruidKubernetesClient implements KubernetesClientApi

public DruidKubernetesClient()
{
this(Config.autoConfigure(null));
this(new ConfigBuilder().build());
}

public DruidKubernetesClient(Config config)
Expand All @@ -41,8 +42,14 @@ public DruidKubernetesClient(Config config)
@Override
public <T> T executeRequest(KubernetesExecutor<T> executor) throws KubernetesResourceNotFoundException
{
try (KubernetesClient client = new DefaultKubernetesClient(config)) {
try (KubernetesClient client = getClient()) {
return executor.executeRequest(client);
}
}

@Override
public KubernetesClient getClient()
{
return new KubernetesClientBuilder().withConfig(config).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,11 @@
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.client.KubernetesClient;
import org.apache.commons.io.input.ReaderInputStream;
import io.fabric8.kubernetes.client.dsl.LogWatch;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;

import java.io.InputStream;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -78,7 +76,7 @@ public Pod launchJobAndWaitForStart(Job job, long howLong, TimeUnit timeUnit)
long start = System.currentTimeMillis();
// launch job
return clientApi.executeRequest(client -> {
client.batch().v1().jobs().inNamespace(namespace).create(job);
client.batch().v1().jobs().inNamespace(namespace).resource(job).create();
K8sTaskId taskId = new K8sTaskId(job.getMetadata().getName());
log.info("Successfully submitted job: %s ... waiting for job to launch", taskId);
// wait until the pod is running or complete or failed, any of those is fine
Expand Down Expand Up @@ -106,7 +104,8 @@ public JobResponse waitForJobCompletion(K8sTaskId taskId, long howLong, TimeUnit
.inNamespace(namespace)
.withName(taskId.getK8sTaskId())
.waitUntilCondition(
x -> (x == null) || (x.getStatus() != null && x.getStatus().getActive() == null),
x -> (x == null) || (x.getStatus() != null && x.getStatus().getActive() == null
&& (x.getStatus().getFailed() != null || x.getStatus().getSucceeded() != null)),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this makes sense to me but what was the reasoning for adding this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More context here

howLong,
unit
);
Expand All @@ -116,6 +115,7 @@ public JobResponse waitForJobCompletion(K8sTaskId taskId, long howLong, TimeUnit
if (job.getStatus().getSucceeded() != null) {
return new JobResponse(job, PeonPhase.SUCCEEDED);
}
log.warn("Task %s failed with status %s", taskId, job.getStatus());
return new JobResponse(job, PeonPhase.FAILED);
});
}
Expand All @@ -124,12 +124,12 @@ public JobResponse waitForJobCompletion(K8sTaskId taskId, long howLong, TimeUnit
public boolean cleanUpJob(K8sTaskId taskId)
{
if (!debugJobs) {
Boolean result = clientApi.executeRequest(client -> client.batch()
.v1()
.jobs()
.inNamespace(namespace)
.withName(taskId.getK8sTaskId())
.delete());
Boolean result = clientApi.executeRequest(client -> !client.batch()
.v1()
.jobs()
.inNamespace(namespace)
.withName(taskId.getK8sTaskId())
.delete().isEmpty());
if (result) {
log.info("Cleaned up k8s task: %s", taskId);
} else {
Expand All @@ -146,23 +146,24 @@ public boolean cleanUpJob(K8sTaskId taskId)
@Override
public Optional<InputStream> getPeonLogs(K8sTaskId taskId)
{
KubernetesClient k8sClient = clientApi.getClient();
try {
return clientApi.executeRequest(client -> {
Reader reader = client.batch()
.v1()
.jobs()
.inNamespace(namespace)
.withName(taskId.getK8sTaskId())
.inContainer("main")
.getLogReader();
if (reader == null) {
return Optional.absent();
}
return Optional.of(new ReaderInputStream(reader, StandardCharsets.UTF_8));
});
LogWatch logWatch = k8sClient.batch()
.v1()
.jobs()
.inNamespace(namespace)
.withName(taskId.getK8sTaskId())
.inContainer("main")
.watchLog();
if (logWatch == null) {
k8sClient.close();
return Optional.absent();
}
return Optional.of(new LogWatchInputStream(k8sClient, logWatch));
}
catch (Exception e) {
log.error(e, "Error streaming logs from task: %s", taskId);
k8sClient.close();
return Optional.absent();
}
}
Expand All @@ -183,17 +184,17 @@ public List<Job> listAllPeonJobs()
public List<Pod> listPeonPods(Set<PeonPhase> phases)
{
return listPeonPods().stream()
.filter(x -> phases.contains(PeonPhase.getPhaseFor(x)))
.collect(Collectors.toList());
.filter(x -> phases.contains(PeonPhase.getPhaseFor(x)))
.collect(Collectors.toList());
}

@Override
public List<Pod> listPeonPods()
{
PodList podList = clientApi.executeRequest(client -> client.pods().inNamespace(namespace))
.withLabel(DruidK8sConstants.LABEL_KEY)
.list();
return podList.getItems();
return clientApi.executeRequest(client -> client.pods().inNamespace(namespace)
.withLabel(DruidK8sConstants.LABEL_KEY)
.list().getItems());

}

@Override
Expand All @@ -203,7 +204,12 @@ public int cleanCompletedJobsOlderThan(long howFarBack, TimeUnit timeUnit)
return clientApi.executeRequest(client -> {
List<Job> jobs = getJobsToCleanup(listAllPeonJobs(), howFarBack, timeUnit);
jobs.forEach(x -> {
if (client.batch().v1().jobs().inNamespace(namespace).withName(x.getMetadata().getName()).delete()) {
if (!client.batch()
.v1()
.jobs()
.inNamespace(namespace)
.withName(x.getMetadata().getName())
.delete().isEmpty()) {
numDeleted.incrementAndGet();
}
});
Expand Down Expand Up @@ -257,5 +263,4 @@ Pod getMainJobPod(KubernetesClient client, K8sTaskId taskId)
throw new KubernetesResourceNotFoundException("K8s pod with label: job-name=" + k8sTaskId + " not found");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.k8s.overlord.common;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
Expand Down Expand Up @@ -119,6 +120,7 @@ public Job fromTask(Task task) throws IOException
@Override
public Task toTask(Pod from) throws IOException
{
// all i have to do here is grab the main container...done
PodSpec podSpec = from.getSpec();
massageSpec(podSpec, "main");
List<EnvVar> envVars = podSpec.getContainers().get(0).getEnv();
Expand Down Expand Up @@ -199,8 +201,19 @@ protected void setupPorts(Container mainContainer)
mainContainer.setPorts(Lists.newArrayList(httpsPort, tcpPort));
}

protected void addEnvironmentVariables(Container mainContainer, PeonCommandContext context, String taskContents)
@VisibleForTesting
void addEnvironmentVariables(Container mainContainer, PeonCommandContext context, String taskContents)
throws JsonProcessingException
{
// if the peon monitors are set, override the overlord's monitors (if set) with the peon monitors
if (!taskRunnerConfig.peonMonitors.isEmpty()) {
mainContainer.getEnv().removeIf(x -> "druid_monitoring_monitors".equals(x.getName()));
mainContainer.getEnv().add(new EnvVarBuilder()
.withName("druid_monitoring_monitors")
.withValue(mapper.writeValueAsString(taskRunnerConfig.peonMonitors))
.build());
}

mainContainer.getEnv().addAll(Lists.newArrayList(
new EnvVarBuilder()
.withName(DruidK8sConstants.TASK_DIR_ENV)
Expand Down Expand Up @@ -234,7 +247,7 @@ protected Container setupMainContainer(
PeonCommandContext context,
long containerSize,
String taskContents
)
) throws JsonProcessingException
{
// prepend the startup task.json extraction command
List<String> mainCommand = Lists.newArrayList("sh", "-c");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,15 @@

package org.apache.druid.k8s.overlord.common;

import io.fabric8.kubernetes.client.KubernetesClient;

// Wraps all kubernetes api calls, to ensure you open and close connections properly
public interface KubernetesClientApi
{
<T> T executeRequest(KubernetesExecutor<T> executor) throws KubernetesResourceNotFoundException;

// use only when handling streams of data, example if you want to pass around an input stream from a pod,
// then you would call this instead of executeRequest as you would want to keep the connection open until you
// are done with the stream. Callers responsibility to clean up when using this method
KubernetesClient getClient();
}
Loading