Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
7a86726
Separate out task logs
Aug 17, 2023
151b5c0
working with cleaner configs
Aug 17, 2023
455dfcf
Remove unneeded changes
Aug 17, 2023
6dd5713
Working with new configs
Aug 18, 2023
4823adc
Merge branch 'master' of github.com:georgew5656/druid into saveTaskLogs
Aug 18, 2023
14650c9
Pulling remote changes in
Aug 18, 2023
378a472
Fixing checkstyle
Aug 18, 2023
12374be
Cleanup unit tests
Aug 20, 2023
ea8a37b
fix checkstyle
Aug 21, 2023
39c62ac
Add more unit tests
Aug 21, 2023
ca17d62
Clean up check failures
Aug 21, 2023
bf21854
PR changes
Aug 23, 2023
be50e45
Fix spellign errors
Aug 23, 2023
3a59f3f
Fix spacing in docs
Aug 28, 2023
3bc19de
Don't overwrite table format
Aug 28, 2023
4b05f39
don't fix table format
Aug 28, 2023
ffcf0a7
Rename config
Sep 13, 2023
01989d7
Fix merge conflicts
Sep 13, 2023
baf9c75
fix merge conflicts
Sep 13, 2023
a74aad7
Remove config options
Sep 15, 2023
bbe7444
Small fixes
Sep 15, 2023
f3f8aac
Remove uneeded param
Sep 18, 2023
7760688
fix build
Sep 18, 2023
519b046
Merge branch 'master' of github.com:georgew5656/druid into useTaskMan…
Sep 19, 2023
4afa8f3
remove unneeded arg
Sep 19, 2023
0af80bf
Remove unused import
Sep 19, 2023
ba82cb1
PR changes
Sep 22, 2023
f200439
more pr changes
Sep 25, 2023
9f171fe
Merge branch 'master' into useTaskManagerForTaskPayload
Sep 27, 2023
59dc958
More pr changes
Sep 28, 2023
6123a3e
Fix static checks
Sep 29, 2023
99b68f0
Update extensions-contrib/kubernetes-overlord-extensions/src/main/jav…
georgew5656 Sep 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions distribution/docker/peon.sh
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ then
mkdir -p ${DRUID_DIRS_TO_CREATE}
fi

# take the ${TASK_JSON} environment variable and base64 decode, unzip and throw it in ${TASK_DIR}/task.json
mkdir -p ${TASK_DIR}; echo ${TASK_JSON} | base64 -d | gzip -d > ${TASK_DIR}/task.json;
# take the ${TASK_JSON} environment variable and base64 decode, unzip and throw it in ${TASK_DIR}/task.json.
# If TASK_JSON is not set, CliPeon will pull the task.json file from deep storage.
mkdir -p ${TASK_DIR}; [ -n "$TASK_JSON" ] && echo ${TASK_JSON} | base64 -d | gzip -d > ${TASK_DIR}/task.json;

exec bin/run-java ${JAVA_OPTS} -cp $COMMON_CONF_DIR:$SERVICE_CONF_DIR:lib/*: org.apache.druid.cli.Main internal peon $@
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down Expand Up @@ -71,9 +72,9 @@ public interface TaskStateListener

protected enum State
{
/** Lifecycle's state before {@link #run(Job, long, long)} or {@link #join(long)} is called. */
/** Lifecycle's state before {@link #run(Job, long, long, boolean)} or {@link #join(long)} is called. */
NOT_STARTED,
/** Lifecycle's state since {@link #run(Job, long, long)} is called. */
/** Lifecycle's state since {@link #run(Job, long, long, boolean)} is called. */
PENDING,
/** Lifecycle's state since {@link #join(long)} is called. */
RUNNING,
Expand All @@ -88,7 +89,6 @@ protected enum State
private final KubernetesPeonClient kubernetesClient;
private final ObjectMapper mapper;
private final TaskStateListener stateListener;

@MonotonicNonNull
private LogWatch logWatch;

Expand Down Expand Up @@ -119,11 +119,15 @@ protected KubernetesPeonLifecycle(
* @return
* @throws IllegalStateException
*/
protected synchronized TaskStatus run(Job job, long launchTimeout, long timeout) throws IllegalStateException
protected synchronized TaskStatus run(Job job, long launchTimeout, long timeout, boolean useDeepStorageForTaskPayload) throws IllegalStateException, IOException
{
try {
updateState(new State[]{State.NOT_STARTED}, State.PENDING);

if (useDeepStorageForTaskPayload) {
writeTaskPayload(task);
}

// In case something bad happens and run is called twice on this KubernetesPeonLifecycle, reset taskLocation.
taskLocation = null;
kubernetesClient.launchPeonJobAndWaitForStart(
Expand All @@ -144,6 +148,25 @@ protected synchronized TaskStatus run(Job job, long launchTimeout, long timeout)
}
}

private void writeTaskPayload(Task task) throws IOException
{
Path file = null;
try {
Comment thread
abhishekagarwal87 marked this conversation as resolved.
file = Files.createTempFile(taskId.getOriginalTaskId(), "task.json");
FileUtils.writeStringToFile(file.toFile(), mapper.writeValueAsString(task), Charset.defaultCharset());
taskLogs.pushTaskPayload(task.getId(), file.toFile());
}
catch (Exception e) {
log.error("Failed to write task payload for task: %s", taskId.getOriginalTaskId());
throw new RuntimeException(e);
}
finally {
if (file != null) {
Files.deleteIfExists(file);
}
}
}

/**
* Join existing Kubernetes Job
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ protected TaskStatus doTask(Task task, boolean run)
taskStatus = peonLifecycle.run(
adapter.fromTask(task),
config.getTaskLaunchTimeout().toStandardDuration().getMillis(),
config.getTaskTimeout().toStandardDuration().getMillis()
config.getTaskTimeout().toStandardDuration().getMillis(),
adapter.shouldUseDeepStorageForTaskPayload(task)
);
} else {
taskStatus = peonLifecycle.join(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
private final ServiceEmitter emitter;
private KubernetesTaskRunner runner;


@Inject
public KubernetesTaskRunnerFactory(
@Smile ObjectMapper smileMapper,
Expand Down Expand Up @@ -137,15 +136,17 @@ private TaskAdapter buildTaskAdapter(DruidKubernetesClient client)
taskConfig,
startupLoggingConfig,
druidNode,
smileMapper
smileMapper,
taskLogs
);
} else if (PodTemplateTaskAdapter.TYPE.equals(adapter)) {
return new PodTemplateTaskAdapter(
kubernetesTaskRunnerConfig,
taskConfig,
druidNode,
smileMapper,
properties
properties,
taskLogs
);
} else {
return new SingleContainerTaskAdapter(
Expand All @@ -154,7 +155,8 @@ private TaskAdapter buildTaskAdapter(DruidKubernetesClient client)
taskConfig,
startupLoggingConfig,
druidNode,
smileMapper
smileMapper,
taskLogs
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,6 @@ public class DruidK8sConstants
public static final String DRUID_HOSTNAME_ENV = "HOSTNAME";
public static final String LABEL_KEY = "druid.k8s.peons";
public static final String DRUID_LABEL_PREFIX = "druid.";
public static final long MAX_ENV_VARIABLE_KBS = 130048; // 127 KB
static final Predicate<Throwable> IS_TRANSIENT = e -> e instanceof KubernetesResourceNotFoundException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@
import io.fabric8.kubernetes.api.model.ResourceRequirementsBuilder;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InternalServerError;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ForkingTaskRunner;
Expand All @@ -57,8 +60,11 @@
import org.apache.druid.k8s.overlord.common.PeonCommandContext;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
import org.apache.druid.tasklogs.TaskLogs;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -89,14 +95,16 @@ public abstract class K8sTaskAdapter implements TaskAdapter
protected final StartupLoggingConfig startupLoggingConfig;
protected final DruidNode node;
protected final ObjectMapper mapper;
protected final TaskLogs taskLogs;

public K8sTaskAdapter(
KubernetesClientApi client,
KubernetesTaskRunnerConfig taskRunnerConfig,
TaskConfig taskConfig,
StartupLoggingConfig startupLoggingConfig,
DruidNode node,
ObjectMapper mapper
ObjectMapper mapper,
TaskLogs taskLogs
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the name of this class has been confusing since it does so much more than dealing with task logs. could be fixed in some other PR someday.

)
{
this.client = client;
Expand All @@ -105,6 +113,7 @@ public K8sTaskAdapter(
this.startupLoggingConfig = startupLoggingConfig;
this.node = node;
this.mapper = mapper;
this.taskLogs = taskLogs;
}

@Override
Expand Down Expand Up @@ -132,11 +141,39 @@ public Task toTask(Job from) throws IOException
Optional<EnvVar> taskJson = envVars.stream().filter(x -> "TASK_JSON".equals(x.getName())).findFirst();
String contents = taskJson.map(envVar -> taskJson.get().getValue()).orElse(null);
if (contents == null) {
throw new IOException("No TASK_JSON environment variable found in pod: " + from.getMetadata().getName());
log.info("No TASK_JSON environment variable found in pod: %s. Trying to load task payload from deep storage.", from.getMetadata().getName());
return toTaskUsingDeepStorage(from);
}
return mapper.readValue(Base64Compression.decompressBase64(contents), Task.class);
}

private Task toTaskUsingDeepStorage(Job from) throws IOException
{
com.google.common.base.Optional<InputStream> taskBody = taskLogs.streamTaskPayload(getTaskId(from).getOriginalTaskId());
if (!taskBody.isPresent()) {
throw InternalServerError.exception(
"Could not load task payload from deep storage for job [%s]. Check the overlord logs for any errors in uploading task payload to deep storage.",
from.getMetadata().getName()
);
}
String task = IOUtils.toString(taskBody.get(), Charset.defaultCharset());
return mapper.readValue(task, Task.class);
}

@Override
public K8sTaskId getTaskId(Job from)
{
Map<String, String> annotations = from.getSpec().getTemplate().getMetadata().getAnnotations();
if (annotations == null) {
throw DruidException.defensive().build("No annotations found on pod spec for job [%s]", from.getMetadata().getName());
}
String taskId = annotations.get(DruidK8sConstants.TASK_ID);
if (taskId == null) {
throw DruidException.defensive().build("No task_id annotation found on pod spec for job [%s]", from.getMetadata().getName());
}
return new K8sTaskId(taskId);
}

@VisibleForTesting
abstract Job createJobFromPodSpec(PodSpec podSpec, Task task, PeonCommandContext context) throws IOException;

Expand Down Expand Up @@ -219,15 +256,11 @@ void addEnvironmentVariables(Container mainContainer, PeonCommandContext context
.build());
}

mainContainer.getEnv().addAll(Lists.newArrayList(
List<EnvVar> envVars = Lists.newArrayList(
new EnvVarBuilder()
.withName(DruidK8sConstants.TASK_DIR_ENV)
.withValue(context.getTaskDir().getAbsolutePath())
.build(),
new EnvVarBuilder()
.withName(DruidK8sConstants.TASK_JSON_ENV)
.withValue(taskContents)
.build(),
new EnvVarBuilder()
.withName(DruidK8sConstants.JAVA_OPTS)
.withValue(Joiner.on(" ").join(context.getJavaOpts()))
Expand All @@ -244,7 +277,17 @@ void addEnvironmentVariables(Container mainContainer, PeonCommandContext context
null,
"metadata.name"
)).build()).build()
));
);

if (taskContents.length() < DruidK8sConstants.MAX_ENV_VARIABLE_KBS) {
envVars.add(
new EnvVarBuilder()
.withName(DruidK8sConstants.TASK_JSON_ENV)
.withValue(taskContents)
.build()
);
}
mainContainer.getEnv().addAll(envVars);
}

protected Container setupMainContainer(
Expand Down Expand Up @@ -403,6 +446,9 @@ private List<String> generateCommand(Task task)
command.add("--loadBroadcastSegments");
command.add("true");
}

command.add("--taskId");
command.add(task.getId());
log.info(
"Peon Command for K8s job: %s",
ForkingTaskRunner.getMaskedCommand(startupLoggingConfig.getMaskProperties(), command)
Expand Down Expand Up @@ -433,5 +479,12 @@ static ResourceRequirements getResourceRequirements(ResourceRequirements require
}
return requirements;
}

@Override
public boolean shouldUseDeepStorageForTaskPayload(Task task) throws IOException
{
String compressedTaskPayload = Base64Compression.compressBase64(mapper.writeValueAsString(task));
return compressedTaskPayload.length() > DruidK8sConstants.MAX_ENV_VARIABLE_KBS;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.druid.k8s.overlord.common.PeonCommandContext;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
import org.apache.druid.tasklogs.TaskLogs;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -59,10 +60,11 @@ public MultiContainerTaskAdapter(
TaskConfig taskConfig,
StartupLoggingConfig startupLoggingConfig,
DruidNode druidNode,
ObjectMapper mapper
ObjectMapper mapper,
TaskLogs taskLogs
)
{
super(client, taskRunnerConfig, taskConfig, startupLoggingConfig, druidNode, mapper);
super(client, taskRunnerConfig, taskConfig, startupLoggingConfig, druidNode, mapper, taskLogs);
}

@Override
Expand Down
Loading