Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.druid.tasklogs.NoopTaskLogs;
import org.apache.druid.tasklogs.TaskLogKiller;
import org.apache.druid.tasklogs.TaskLogPusher;
import org.apache.druid.tasklogs.TaskLogStreamer;
import org.apache.druid.tasklogs.TaskLogs;

@LoadScope(roles = NodeRole.OVERLORD_JSON_NAME)
Expand Down Expand Up @@ -78,6 +79,7 @@ private void configureTaskLogs(Binder binder)
binder.bind(FileTaskLogs.class).in(LazySingleton.class);

binder.bind(TaskLogPusher.class).to(TaskLogs.class);
binder.bind(TaskLogStreamer.class).to(TaskLogs.class);
binder.bind(TaskLogKiller.class).to(TaskLogs.class);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.k8s.overlord;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
Expand All @@ -31,6 +32,7 @@
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.netty.util.SuppressForbidden;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
Expand All @@ -44,6 +46,7 @@
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.http.client.HttpClient;
Expand All @@ -57,22 +60,24 @@
import org.apache.druid.k8s.overlord.common.KubernetesResourceNotFoundException;
import org.apache.druid.k8s.overlord.common.PeonPhase;
import org.apache.druid.k8s.overlord.common.TaskAdapter;
import org.apache.druid.tasklogs.TaskLogPusher;
import org.apache.druid.tasklogs.TaskLogStreamer;
import org.apache.druid.tasklogs.TaskLogs;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -101,28 +106,31 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner

protected final ConcurrentHashMap<String, K8sWorkItem> tasks = new ConcurrentHashMap<>();
protected final TaskAdapter adapter;
protected final KubernetesPeonClient client;

private final ObjectMapper mapper;
private final KubernetesTaskRunnerConfig k8sConfig;
private final TaskQueueConfig taskQueueConfig;
private final TaskLogPusher taskLogPusher;
private final TaskLogs taskLogs;
private final ListeningExecutorService exec;
private final KubernetesPeonClient client;
private final HttpClient httpClient;


public KubernetesTaskRunner(
ObjectMapper mapper,
TaskAdapter adapter,
KubernetesTaskRunnerConfig k8sConfig,
TaskQueueConfig taskQueueConfig,
TaskLogPusher taskLogPusher,
TaskLogs taskLogs,
KubernetesPeonClient client,
HttpClient httpClient
)
{
this.mapper = mapper;
this.adapter = adapter;
this.k8sConfig = k8sConfig;
this.taskQueueConfig = taskQueueConfig;
this.taskLogPusher = taskLogPusher;
this.taskLogs = taskLogs;
this.client = client;
this.httpClient = httpClient;
this.cleanupExecutor = Executors.newScheduledThreadPool(1);
Expand Down Expand Up @@ -178,20 +186,7 @@ public ListenableFuture<TaskStatus> run(Task task)
completedPhase = monitorJob(k8sTaskId);
}
}
TaskStatus status;
if (PeonPhase.SUCCEEDED.equals(completedPhase.getPhase())) {
status = TaskStatus.success(task.getId());
} else if (completedPhase.getJob() == null) {
status = TaskStatus.failure(
task.getId(),
"K8s Job for task disappeared before completion: " + k8sTaskId
);
} else {
status = TaskStatus.failure(
task.getId(),
"Task failed: " + k8sTaskId
);
}
TaskStatus status = getTaskStatus(k8sTaskId, completedPhase);
if (completedPhase.getJobDuration().isPresent()) {
status = status.withDuration(completedPhase.getJobDuration().get());
}
Expand All @@ -210,7 +205,7 @@ public ListenableFuture<TaskStatus> run(Task task)
if (logStream.isPresent()) {
FileUtils.copyInputStreamToFile(logStream.get(), log.toFile());
}
taskLogPusher.pushTaskLog(task.getId(), log.toFile());
taskLogs.pushTaskLog(task.getId(), log.toFile());
}
finally {
Files.deleteIfExists(log);
Expand Down Expand Up @@ -243,10 +238,31 @@ JobResponse monitorJob(Pod peonPod, K8sTaskId k8sTaskId)
);
}

private TaskStatus getTaskStatus(K8sTaskId task, JobResponse jobResponse) throws IOException
{
Optional<InputStream> maybeTaskStatusStream = taskLogs.streamTaskStatus(task.getOriginalTaskId());
if (maybeTaskStatusStream.isPresent()) {
String taskStatus = IOUtils.toString(maybeTaskStatusStream.get(), StandardCharsets.UTF_8);
return mapper.readValue(taskStatus, TaskStatus.class);
} else if (PeonPhase.SUCCEEDED.equals(jobResponse.getPhase())) {
// fallback to behavior before the introduction of task status streaming for backwards compatibility
return TaskStatus.success(task.getOriginalTaskId());
} else if (Objects.isNull(jobResponse.getJob())) {
return TaskStatus.failure(
task.getOriginalTaskId(),
StringUtils.format("Task [%s] failed kubernetes job disappeared before completion", task.getOriginalTaskId())
);
} else {
return TaskStatus.failure(
task.getOriginalTaskId(),
StringUtils.format("Task [%s] failed", task.getOriginalTaskId())
);
}
}

@Override
public void updateStatus(Task task, TaskStatus status)
{
log.info("Updating task: %s with status %s", task.getId(), status);
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status);
}

Expand Down Expand Up @@ -508,8 +524,8 @@ public TaskLocation getLocation()
}
boolean tlsEnabled = Boolean.parseBoolean(
mainPod.getMetadata()
.getAnnotations()
.getOrDefault(DruidK8sConstants.TLS_ENABLED, "false"));
.getAnnotations()
.getOrDefault(DruidK8sConstants.TLS_ENABLED, "false"));
return TaskLocation.create(
mainPod.getStatus().getPodIP(),
DruidK8sConstants.PORT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import org.apache.druid.k8s.overlord.common.TaskAdapter;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
import org.apache.druid.tasklogs.TaskLogPusher;
import org.apache.druid.tasklogs.TaskLogs;

import java.util.Locale;
import java.util.Properties;
Expand All @@ -54,7 +54,7 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
private final KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig;
private final StartupLoggingConfig startupLoggingConfig;
private final TaskQueueConfig taskQueueConfig;
private final TaskLogPusher taskLogPusher;
private final TaskLogs taskLogs;
private final DruidNode druidNode;
private final TaskConfig taskConfig;
private final Properties properties;
Expand All @@ -68,7 +68,7 @@ public KubernetesTaskRunnerFactory(
KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig,
StartupLoggingConfig startupLoggingConfig,
@JacksonInject TaskQueueConfig taskQueueConfig,
TaskLogPusher taskLogPusher,
TaskLogs taskLogs,
@Self DruidNode druidNode,
TaskConfig taskConfig,
Properties properties
Expand All @@ -80,7 +80,7 @@ public KubernetesTaskRunnerFactory(
this.kubernetesTaskRunnerConfig = kubernetesTaskRunnerConfig;
this.startupLoggingConfig = startupLoggingConfig;
this.taskQueueConfig = taskQueueConfig;
this.taskLogPusher = taskLogPusher;
this.taskLogs = taskLogs;
this.druidNode = druidNode;
this.taskConfig = taskConfig;
this.properties = properties;
Expand All @@ -100,10 +100,11 @@ public KubernetesTaskRunner build()
}

runner = new KubernetesTaskRunner(
smileMapper,
buildTaskAdapter(client),
kubernetesTaskRunnerConfig,
taskQueueConfig,
taskLogPusher,
taskLogs,
new DruidKubernetesPeonClient(client, kubernetesTaskRunnerConfig.namespace, kubernetesTaskRunnerConfig.debugJobs),
httpClient
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
import org.apache.druid.tasklogs.NoopTaskLogs;
import org.apache.druid.tasklogs.TaskLogPusher;
import org.apache.druid.tasklogs.TaskLogs;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -45,7 +45,7 @@ public class KubernetesTaskRunnerFactoryTest
private KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig;
private StartupLoggingConfig startupLoggingConfig;
private TaskQueueConfig taskQueueConfig;
private TaskLogPusher taskLogPusher;
private TaskLogs taskLogs;
private DruidNode druidNode;
private TaskConfig taskConfig;
private Properties properties;
Expand All @@ -62,7 +62,7 @@ public void setup()
null,
null
);
taskLogPusher = new NoopTaskLogs();
taskLogs = new NoopTaskLogs();
druidNode = new DruidNode(
"test",
"",
Expand All @@ -85,7 +85,7 @@ public void test_get_returnsSameKuberentesTaskRunner_asBuild()
kubernetesTaskRunnerConfig,
startupLoggingConfig,
taskQueueConfig,
taskLogPusher,
taskLogs,
druidNode,
taskConfig,
properties
Expand All @@ -106,7 +106,7 @@ public void test_build_withoutSidecarSupport_returnsKubernetesTaskRunnerWithSing
kubernetesTaskRunnerConfig,
startupLoggingConfig,
taskQueueConfig,
taskLogPusher,
taskLogs,
druidNode,
taskConfig,
properties
Expand All @@ -129,7 +129,7 @@ public void test_build_withSidecarSupport_returnsKubernetesTaskRunnerWithMultiCo
kubernetesTaskRunnerConfig,
startupLoggingConfig,
taskQueueConfig,
taskLogPusher,
taskLogs,
druidNode,
taskConfig,
properties
Expand All @@ -153,7 +153,7 @@ public void test_build_withSingleContainerAdapterType_returnsKubernetesTaskRunne
kubernetesTaskRunnerConfig,
startupLoggingConfig,
taskQueueConfig,
taskLogPusher,
taskLogs,
druidNode,
taskConfig,
props
Expand All @@ -179,7 +179,7 @@ public void test_build_withSingleContainerAdapterTypeAndSidecarSupport_throwsIAE
kubernetesTaskRunnerConfig,
startupLoggingConfig,
taskQueueConfig,
taskLogPusher,
taskLogs,
druidNode,
taskConfig,
props
Expand All @@ -206,7 +206,7 @@ public void test_build_withMultiContainerAdapterType_returnsKubernetesTaskRunner
kubernetesTaskRunnerConfig,
startupLoggingConfig,
taskQueueConfig,
taskLogPusher,
taskLogs,
druidNode,
taskConfig,
props
Expand All @@ -230,7 +230,7 @@ public void test_build_withMultiContainerAdapterTypeAndSidecarSupport_returnsKub
kubernetesTaskRunnerConfig,
startupLoggingConfig,
taskQueueConfig,
taskLogPusher,
taskLogs,
druidNode,
taskConfig,
props
Expand All @@ -257,7 +257,7 @@ public void test_build_withPodTemplateAdapterType_returnsKubernetesTaskRunnerWit
kubernetesTaskRunnerConfig,
startupLoggingConfig,
taskQueueConfig,
taskLogPusher,
taskLogs,
druidNode,
taskConfig,
props
Expand Down
Loading