Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/development/extensions-contrib/k8s-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,7 @@ Druid selects the pod template `podSpecWithHighMemRequests.yaml`.
|`druid.indexer.runner.maxTaskDuration`| `Duration` | Max time a task is allowed to run for before getting killed |`PT4H`|No|
|`druid.indexer.runner.taskCleanupDelay`| `Duration` | How long do jobs stay around before getting reaped from K8s |`P2D`|No|
|`druid.indexer.runner.taskCleanupInterval`| `Duration` | How often to check for jobs to be reaped |`PT10M`|No|
|`druid.indexer.runner.taskJoinTimeout`| `Duration` | Timeout for gathering metadata about existing tasks on startup |`PT1M`|No|
|`druid.indexer.runner.K8sjobLaunchTimeout`| `Duration` | How long to wait to launch a K8s task before marking it as failed, on a resource constrained cluster it may take some time. |`PT1H`|No|
|`druid.indexer.runner.javaOptsArray`| `JsonArray` | java opts for the task. |`-Xmx1g`|No|
|`druid.indexer.runner.labels`| `JsonObject` | Additional labels you want to add to peon pod |`{}`|No|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodStatus;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
Expand Down Expand Up @@ -89,6 +91,8 @@ protected enum State
private final KubernetesPeonClient kubernetesClient;
private final ObjectMapper mapper;
private final TaskStateListener stateListener;
private final SettableFuture<Boolean> taskStartedSuccessfullyFuture;

@MonotonicNonNull
private LogWatch logWatch;

Expand All @@ -108,6 +112,7 @@ protected KubernetesPeonLifecycle(
this.taskLogs = taskLogs;
this.mapper = mapper;
this.stateListener = stateListener;
this.taskStartedSuccessfullyFuture = SettableFuture.create();
}

/**
Expand Down Expand Up @@ -136,11 +141,13 @@ protected synchronized TaskStatus run(Job job, long launchTimeout, long timeout,
launchTimeout,
TimeUnit.MILLISECONDS
);

return join(timeout);
}
catch (Exception e) {
log.info("Failed to run task: %s", taskId.getOriginalTaskId());
if (!taskStartedSuccessfullyFuture.isDone()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

taskStartedSuccessfullyFuture is not set to true in run as far as I can tell. is that intentional?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

run always calls into join (line 140) so it'll get set there

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, I see, OK.

taskStartedSuccessfullyFuture.set(false);
}
throw e;
}
finally {
Expand Down Expand Up @@ -178,7 +185,7 @@ protected synchronized TaskStatus join(long timeout) throws IllegalStateExceptio
{
try {
updateState(new State[]{State.NOT_STARTED, State.PENDING}, State.RUNNING);

taskStartedSuccessfullyFuture.set(true);
JobResponse jobResponse = kubernetesClient.waitForPeonJobCompletion(
taskId,
timeout,
Expand All @@ -187,14 +194,19 @@ protected synchronized TaskStatus join(long timeout) throws IllegalStateExceptio

return getTaskStatus(jobResponse.getJobDuration());
}
catch (Exception e) {
if (!taskStartedSuccessfullyFuture.isDone()) {
taskStartedSuccessfullyFuture.set(false);
}
throw e;
}
finally {
try {
saveLogs();
}
catch (Exception e) {
log.warn(e, "Log processing failed for task [%s]", taskId);
}

stopTask();
}
}
Expand Down Expand Up @@ -245,7 +257,10 @@ protected State getState()
protected TaskLocation getTaskLocation()
{
if (State.PENDING.equals(state.get()) || State.NOT_STARTED.equals(state.get())) {
log.debug("Can't get task location for non-running job. [%s]", taskId.getOriginalTaskId());
/* This should not actually ever happen because KubernetesTaskRunner.start() should not return until all running tasks
have already gone into State.RUNNING, so getTaskLocation should not be called.
*/
log.warn("Can't get task location for non-running job. [%s]", taskId.getOriginalTaskId());
return TaskLocation.unknown();
}

Expand All @@ -256,13 +271,18 @@ protected TaskLocation getTaskLocation()
if (taskLocation == null) {
Optional<Pod> maybePod = kubernetesClient.getPeonPod(taskId.getK8sJobName());
if (!maybePod.isPresent()) {
/* Arguably we should throw a exception here but leaving it as a warn log to prevent unexpected errors.
If there is strange behavior during overlord restarts the operator should look for this warn log.
*/
log.warn("Could not get task location from k8s for task [%s].", taskId);
return TaskLocation.unknown();
}

Pod pod = maybePod.get();
PodStatus podStatus = pod.getStatus();

if (podStatus == null || podStatus.getPodIP() == null) {
log.warn("Could not get task location from k8s for task [%s].", taskId);
return TaskLocation.unknown();
}
taskLocation = TaskLocation.create(
Expand Down Expand Up @@ -378,4 +398,17 @@ private void updateState(State[] acceptedStates, State targetState)
);
stateListener.stateChanged(state.get(), taskId.getOriginalTaskId());
}

/**
* Retrieves the current {@link ListenableFuture} representing whether the task started successfully
*
* <p>This future can be used to track whether the task started successfully, with a boolean result
* indicating success (true) or failure (false) when the task starts.
*
* @return a {@link ListenableFuture} representing whether the task started successfully.
*/
protected ListenableFuture<Boolean> getTaskStartedSuccessfullyFuture()
{
return taskStartedSuccessfullyFuture;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
Expand All @@ -55,12 +56,14 @@
import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter;
import org.apache.druid.tasklogs.TaskLogStreamer;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.DateTime;
import org.joda.time.Duration;

import javax.annotation.Nullable;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -146,16 +149,28 @@ public Optional<InputStream> streamTaskLog(String taskid, long offset)
public ListenableFuture<TaskStatus> run(Task task)
{
synchronized (tasks) {
return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> runTask(task))))
.getResult();
return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(
task,
exec.submit(() -> runTask(task)),
peonLifecycleFactory.build(
task,
this::emitTaskStateMetrics
)
)).getResult();
}
}

protected ListenableFuture<TaskStatus> joinAsync(Task task)
protected KubernetesWorkItem joinAsync(Task task)
{
synchronized (tasks) {
return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> joinTask(task))))
.getResult();
return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(
task,
exec.submit(() -> joinTask(task)),
peonLifecycleFactory.build(
task,
this::emitTaskStateMetrics
)
));
}
}

Expand All @@ -173,10 +188,7 @@ private TaskStatus joinTask(Task task)
protected TaskStatus doTask(Task task, boolean run)
{
try {
KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build(
task,
this::emitTaskStateMetrics
);
KubernetesPeonLifecycle peonLifecycle;

synchronized (tasks) {
KubernetesWorkItem workItem = tasks.get(task.getId());
Expand All @@ -185,7 +197,7 @@ protected TaskStatus doTask(Task task, boolean run)
throw new ISE("Task [%s] has been shut down", task.getId());
}

workItem.setKubernetesPeonLifecycle(peonLifecycle);
peonLifecycle = workItem.getPeonLifeycle();
}

TaskStatus taskStatus;
Expand Down Expand Up @@ -321,16 +333,53 @@ public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
public void start()
{
log.info("Starting K8sTaskRunner...");
// Load tasks from previously running jobs and wait for their statuses to be updated asynchronously.
for (Job job : client.getPeonJobs()) {
// Load tasks from previously running jobs and wait for their statuses to start running.
final List<ListenableFuture<Boolean>> taskStatusActiveList = new ArrayList<>();
final List<Job> peonJobs = client.getPeonJobs();

log.info("Locating [%,d] active tasks.", peonJobs.size());
for (Job job : peonJobs) {
try {
joinAsync(adapter.toTask(job));
KubernetesWorkItem kubernetesWorkItem = joinAsync(adapter.toTask(job));
taskStatusActiveList.add(kubernetesWorkItem.getPeonLifeycle().getTaskStartedSuccessfullyFuture());
}
catch (IOException e) {
log.error(e, "Error deserializing task from job [%s]", job.getMetadata().getName());
}
}
log.info("Loaded %,d tasks from previous run", tasks.size());

try {
final DateTime nowUtc = DateTimes.nowUtc();
final long timeoutMs = nowUtc.plus(config.getTaskJoinTimeout()).getMillis() - nowUtc.getMillis();
if (timeoutMs > 0) {
FutureUtils.coalesce(taskStatusActiveList).get(timeoutMs, TimeUnit.MILLISECONDS);
}
log.info("Located [%,d] active tasks.", taskStatusActiveList.size());
}
catch (Exception e) {
final long numInitialized =
tasks.values()
.stream()
.filter(item -> {
if (item.getPeonLifeycle().getTaskStartedSuccessfullyFuture().isDone()) {
try {
return item.getPeonLifeycle().getTaskStartedSuccessfullyFuture().get();
}
catch (InterruptedException | ExecutionException ex) {
return false;
}
} else {
return false;
}
}).count();
log.warn(
e,
"Located [%,d] out of [%,d] active tasks (timeout = %s). Locating others asynchronously.",
numInitialized,
taskStatusActiveList.size(),
config.getTaskJoinTimeout()
);
}

cleanupExecutor.scheduleAtFixedRate(
() ->
Expand All @@ -342,7 +391,7 @@ public void start()
config.getTaskCleanupInterval().toStandardDuration().getMillis(),
TimeUnit.MILLISECONDS
);
log.debug("Started cleanup executor for jobs older than %s...", config.getTaskCleanupDelay());
log.info("Started cleanup executor for jobs older than %s...", config.getTaskCleanupDelay());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ public class KubernetesTaskRunnerConfig
// interval for k8s job cleanup to run
private Period taskCleanupInterval = new Period("PT10m");

@JsonProperty
@NotNull
// how long to wait to join peon k8s jobs on startup
private Period taskJoinTimeout = new Period("PT1M");


@JsonProperty
@NotNull
// how long to wait for the peon k8s job to launch
Expand Down Expand Up @@ -140,7 +146,8 @@ private KubernetesTaskRunnerConfig(
int cpuCoreInMicro,
Map<String, String> labels,
Map<String, String> annotations,
Integer capacity
Integer capacity,
Period taskJoinTimeout
)
{
this.namespace = namespace;
Expand Down Expand Up @@ -181,6 +188,10 @@ private KubernetesTaskRunnerConfig(
k8sjobLaunchTimeout,
this.k8sjobLaunchTimeout
);
this.taskJoinTimeout = ObjectUtils.defaultIfNull(
taskJoinTimeout,
this.taskJoinTimeout
);
this.peonMonitors = ObjectUtils.defaultIfNull(
peonMonitors,
this.peonMonitors
Expand Down Expand Up @@ -247,6 +258,11 @@ public Period getTaskTimeout()
{
return maxTaskDuration;
}
public Period getTaskJoinTimeout()
{
return taskJoinTimeout;
}


public Period getTaskCleanupDelay()
{
Expand Down Expand Up @@ -317,6 +333,7 @@ public static class Builder
private Map<String, String> labels;
private Map<String, String> annotations;
private Integer capacity;
private Period taskJoinTimeout;

public Builder()
{
Expand Down Expand Up @@ -425,6 +442,12 @@ public Builder withCapacity(@Min(0) @Max(Integer.MAX_VALUE) Integer capacity)
return this;
}

public Builder withTaskJoinTimeout(Period taskJoinTimeout)
{
this.taskJoinTimeout = taskJoinTimeout;
return this;
}

public KubernetesTaskRunnerConfig build()
{
return new KubernetesTaskRunnerConfig(
Expand All @@ -444,7 +467,8 @@ public KubernetesTaskRunnerConfig build()
this.cpuCoreInMicro,
this.labels,
this.annotations,
this.capacity
this.capacity,
this.taskJoinTimeout
);
}
}
Expand Down
Loading