Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.druid.tasklogs.NoopTaskLogs;
import org.apache.druid.tasklogs.TaskLogKiller;
import org.apache.druid.tasklogs.TaskLogPusher;
import org.apache.druid.tasklogs.TaskLogStreamer;
import org.apache.druid.tasklogs.TaskLogs;

@LoadScope(roles = NodeRole.OVERLORD_JSON_NAME)
Expand Down Expand Up @@ -78,6 +79,7 @@ private void configureTaskLogs(Binder binder)
binder.bind(FileTaskLogs.class).in(LazySingleton.class);

binder.bind(TaskLogPusher.class).to(TaskLogs.class);
binder.bind(TaskLogStreamer.class).to(TaskLogs.class);
binder.bind(TaskLogKiller.class).to(TaskLogs.class);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.k8s.overlord;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
Expand All @@ -31,6 +32,7 @@
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.netty.util.SuppressForbidden;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
Expand All @@ -44,6 +46,7 @@
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.http.client.HttpClient;
Expand All @@ -57,22 +60,24 @@
import org.apache.druid.k8s.overlord.common.KubernetesResourceNotFoundException;
import org.apache.druid.k8s.overlord.common.PeonPhase;
import org.apache.druid.k8s.overlord.common.TaskAdapter;
import org.apache.druid.tasklogs.TaskLogPusher;
import org.apache.druid.tasklogs.TaskLogStreamer;
import org.apache.druid.tasklogs.TaskLogs;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -101,28 +106,31 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner

protected final ConcurrentHashMap<String, K8sWorkItem> tasks = new ConcurrentHashMap<>();
protected final TaskAdapter adapter;
protected final KubernetesPeonClient client;

private final ObjectMapper mapper;
private final KubernetesTaskRunnerConfig k8sConfig;
private final TaskQueueConfig taskQueueConfig;
private final TaskLogPusher taskLogPusher;
private final TaskLogs taskLogs;
private final ListeningExecutorService exec;
private final KubernetesPeonClient client;
private final HttpClient httpClient;


public KubernetesTaskRunner(
ObjectMapper mapper,
TaskAdapter adapter,
KubernetesTaskRunnerConfig k8sConfig,
TaskQueueConfig taskQueueConfig,
TaskLogPusher taskLogPusher,
TaskLogs taskLogs,
KubernetesPeonClient client,
HttpClient httpClient
)
{
this.mapper = mapper;
this.adapter = adapter;
this.k8sConfig = k8sConfig;
this.taskQueueConfig = taskQueueConfig;
this.taskLogPusher = taskLogPusher;
this.taskLogs = taskLogs;
this.client = client;
this.httpClient = httpClient;
this.cleanupExecutor = Executors.newScheduledThreadPool(1);
Expand Down Expand Up @@ -178,20 +186,7 @@ public ListenableFuture<TaskStatus> run(Task task)
completedPhase = monitorJob(k8sTaskId);
}
}
TaskStatus status;
if (PeonPhase.SUCCEEDED.equals(completedPhase.getPhase())) {
status = TaskStatus.success(task.getId());
} else if (completedPhase.getJob() == null) {
status = TaskStatus.failure(
task.getId(),
"K8s Job for task disappeared before completion: " + k8sTaskId
);
} else {
status = TaskStatus.failure(
task.getId(),
"Task failed: " + k8sTaskId
);
}
TaskStatus status = getTaskStatus(k8sTaskId, completedPhase);
if (completedPhase.getJobDuration().isPresent()) {
status = status.withDuration(completedPhase.getJobDuration().get());
}
Expand All @@ -210,7 +205,7 @@ public ListenableFuture<TaskStatus> run(Task task)
if (logStream.isPresent()) {
FileUtils.copyInputStreamToFile(logStream.get(), log.toFile());
}
taskLogPusher.pushTaskLog(task.getId(), log.toFile());
taskLogs.pushTaskLog(task.getId(), log.toFile());
}
finally {
Files.deleteIfExists(log);
Expand Down Expand Up @@ -243,10 +238,31 @@ JobResponse monitorJob(Pod peonPod, K8sTaskId k8sTaskId)
);
}

private TaskStatus getTaskStatus(K8sTaskId task, JobResponse jobResponse) throws IOException
{
Optional<InputStream> maybeTaskStatusStream = taskLogs.streamTaskStatus(task.getOriginalTaskId());
if (maybeTaskStatusStream.isPresent()) {
String taskStatus = IOUtils.toString(maybeTaskStatusStream.get(), StandardCharsets.UTF_8);
return mapper.readValue(taskStatus, TaskStatus.class);
} else if (PeonPhase.SUCCEEDED.equals(jobResponse.getPhase())) {
// fallback to behavior before the introduction of task status streaming for backwards compatibility
return TaskStatus.success(task.getOriginalTaskId());
} else if (Objects.isNull(jobResponse.getJob())) {
return TaskStatus.failure(
task.getOriginalTaskId(),
StringUtils.format("Task [%s] failed kubernetes job disappeared before completion", task.getOriginalTaskId())
);
} else {
return TaskStatus.failure(
task.getOriginalTaskId(),
StringUtils.format("Task [%s] failed", task.getOriginalTaskId())
);
}
}

@Override
public void updateStatus(Task task, TaskStatus status)
{
log.info("Updating task: %s with status %s", task.getId(), status);
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status);
}

Expand Down Expand Up @@ -508,8 +524,8 @@ public TaskLocation getLocation()
}
boolean tlsEnabled = Boolean.parseBoolean(
mainPod.getMetadata()
.getAnnotations()
.getOrDefault(DruidK8sConstants.TLS_ENABLED, "false"));
.getAnnotations()
.getOrDefault(DruidK8sConstants.TLS_ENABLED, "false"));
return TaskLocation.create(
mainPod.getStatus().getPodIP(),
DruidK8sConstants.PORT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import org.apache.druid.k8s.overlord.common.TaskAdapter;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
import org.apache.druid.tasklogs.TaskLogPusher;
import org.apache.druid.tasklogs.TaskLogs;

import java.util.Locale;
import java.util.Properties;
Expand All @@ -54,7 +54,7 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
private final KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig;
private final StartupLoggingConfig startupLoggingConfig;
private final TaskQueueConfig taskQueueConfig;
private final TaskLogPusher taskLogPusher;
private final TaskLogs taskLogs;
private final DruidNode druidNode;
private final TaskConfig taskConfig;
private final Properties properties;
Expand All @@ -68,7 +68,7 @@ public KubernetesTaskRunnerFactory(
KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig,
StartupLoggingConfig startupLoggingConfig,
@JacksonInject TaskQueueConfig taskQueueConfig,
TaskLogPusher taskLogPusher,
TaskLogs taskLogs,
@Self DruidNode druidNode,
TaskConfig taskConfig,
Properties properties
Expand All @@ -80,7 +80,7 @@ public KubernetesTaskRunnerFactory(
this.kubernetesTaskRunnerConfig = kubernetesTaskRunnerConfig;
this.startupLoggingConfig = startupLoggingConfig;
this.taskQueueConfig = taskQueueConfig;
this.taskLogPusher = taskLogPusher;
this.taskLogs = taskLogs;
this.druidNode = druidNode;
this.taskConfig = taskConfig;
this.properties = properties;
Expand All @@ -100,10 +100,11 @@ public KubernetesTaskRunner build()
}

runner = new KubernetesTaskRunner(
smileMapper,
buildTaskAdapter(client),
kubernetesTaskRunnerConfig,
taskQueueConfig,
taskLogPusher,
taskLogs,
new DruidKubernetesPeonClient(client, kubernetesTaskRunnerConfig.namespace, kubernetesTaskRunnerConfig.debugJobs),
httpClient
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
import org.apache.druid.tasklogs.NoopTaskLogs;
import org.apache.druid.tasklogs.TaskLogPusher;
import org.apache.druid.tasklogs.TaskLogs;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -45,7 +45,7 @@ public class KubernetesTaskRunnerFactoryTest
private KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig;
private StartupLoggingConfig startupLoggingConfig;
private TaskQueueConfig taskQueueConfig;
private TaskLogPusher taskLogPusher;
private TaskLogs taskLogs;
private DruidNode druidNode;
private TaskConfig taskConfig;
private Properties properties;
Expand All @@ -62,7 +62,7 @@ public void setup()
null,
null
);
taskLogPusher = new NoopTaskLogs();
taskLogs = new NoopTaskLogs();
druidNode = new DruidNode(
"test",
"",
Expand All @@ -85,7 +85,7 @@ public void test_get_returnsSameKuberentesTaskRunner_asBuild()
kubernetesTaskRunnerConfig,
startupLoggingConfig,
taskQueueConfig,
taskLogPusher,
taskLogs,
druidNode,
taskConfig,
properties
Expand All @@ -106,7 +106,7 @@ public void test_build_withoutSidecarSupport_returnsKubernetesTaskRunnerWithSing
kubernetesTaskRunnerConfig,
startupLoggingConfig,
taskQueueConfig,
taskLogPusher,
taskLogs,
druidNode,
taskConfig,
properties
Expand All @@ -129,7 +129,7 @@ public void test_build_withSidecarSupport_returnsKubernetesTaskRunnerWithMultiCo
kubernetesTaskRunnerConfig,
startupLoggingConfig,
taskQueueConfig,
taskLogPusher,
taskLogs,
druidNode,
taskConfig,
properties
Expand All @@ -153,7 +153,7 @@ public void test_build_withSingleContainerAdapterType_returnsKubernetesTaskRunne
kubernetesTaskRunnerConfig,
startupLoggingConfig,
taskQueueConfig,
taskLogPusher,
taskLogs,
druidNode,
taskConfig,
props
Expand All @@ -179,7 +179,7 @@ public void test_build_withSingleContainerAdapterTypeAndSidecarSupport_throwsIAE
kubernetesTaskRunnerConfig,
startupLoggingConfig,
taskQueueConfig,
taskLogPusher,
taskLogs,
druidNode,
taskConfig,
props
Expand All @@ -206,7 +206,7 @@ public void test_build_withMultiContainerAdapterType_returnsKubernetesTaskRunner
kubernetesTaskRunnerConfig,
startupLoggingConfig,
taskQueueConfig,
taskLogPusher,
taskLogs,
druidNode,
taskConfig,
props
Expand All @@ -230,7 +230,7 @@ public void test_build_withMultiContainerAdapterTypeAndSidecarSupport_returnsKub
kubernetesTaskRunnerConfig,
startupLoggingConfig,
taskQueueConfig,
taskLogPusher,
taskLogs,
druidNode,
taskConfig,
props
Expand All @@ -257,7 +257,7 @@ public void test_build_withPodTemplateAdapterType_returnsKubernetesTaskRunnerWit
kubernetesTaskRunnerConfig,
startupLoggingConfig,
taskQueueConfig,
taskLogPusher,
taskLogs,
druidNode,
taskConfig,
props
Expand Down
Loading