diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml
index 5336ce09578b..c9099d02e2e8 100644
--- a/indexing-service/pom.xml
+++ b/indexing-service/pom.xml
@@ -75,5 +75,25 @@
test
+
+
+ default-tools.jar
+
+
+ java.vendor
+ Oracle Corporation
+
+
+
+
+ com.sun
+ tools
+ ${java.version}}
+ system
+ ${java.home}/../lib/tools.jar
+
+
+
+
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java
index 8239e8590877..d7008adad01d 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java
@@ -19,25 +19,37 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.CharMatcher;
+import com.google.common.base.Charsets;
+import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
import com.google.common.base.Splitter;
import com.google.common.base.Throwables;
+import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import com.google.common.io.ByteSource;
-import com.google.common.io.ByteStreams;
-import com.google.common.io.Closer;
-import com.google.common.io.Files;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.metamx.common.ISE;
+import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
+import com.metamx.http.client.HttpClient;
+import com.metamx.http.client.Request;
+import com.metamx.http.client.response.ClientResponse;
+import com.metamx.http.client.response.HttpResponseHandler;
+import com.sun.tools.attach.AttachNotSupportedException;
+import com.sun.tools.attach.VirtualMachine;
+import com.sun.tools.attach.VirtualMachineDescriptor;
+import io.druid.concurrent.Execs;
+import io.druid.guice.annotations.Global;
import io.druid.guice.annotations.Self;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.config.TaskConfig;
@@ -49,38 +61,73 @@
import io.druid.tasklogs.TaskLogPusher;
import io.druid.tasklogs.TaskLogStreamer;
import org.apache.commons.io.FileUtils;
+import org.jboss.netty.handler.codec.http.HttpChunk;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
-import java.io.OutputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.nio.file.StandardWatchEventKinds;
+import java.nio.file.WatchService;
+import java.util.Arrays;
import java.util.Collection;
import java.util.List;
-import java.util.Map;
import java.util.Properties;
-import java.util.UUID;
+import java.util.Scanner;
import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
/**
* Runs tasks in separate processes using the "internal peon" verb.
+ *
+ * The peon is responsible for creating this file when it starts, and deleting it during shutdown. It is a simple way to signify if the task is still running.
+ *
+ * It also allows searching through the running JVMs for one that has a port set to this value to find the JVM instance.
*/
public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
{
+ public static final String TASKID_PROPERTY = "io.druid.indexing.worker.taskid";
+ private static final String PORT_FILE_NAME = "task.port";
+ private static final String LOG_FILE_NAME = "task.log";
+ private static final String TASK_FILE_NAME = "task.json";
+ private static final String STATUS_FILE_NAME = "status.json";
private static final EmittingLogger log = new EmittingLogger(ForkingTaskRunner.class);
private static final String CHILD_PROPERTY_PREFIX = "druid.indexer.fork.property.";
private static final Splitter whiteSpaceSplitter = Splitter.on(CharMatcher.WHITESPACE).omitEmptyStrings();
+ private static final int MAX_DELETE_RETRIES = 3; // How many times should we try to delete the attempt dir on cleanup
private final ForkingTaskRunnerConfig config;
private final TaskConfig taskConfig;
+ private final WorkerConfig workerConfig;
private final Properties props;
private final TaskLogPusher taskLogPusher;
private final DruidNode node;
- private final ListeningExecutorService exec;
private final ObjectMapper jsonMapper;
private final PortFinder portFinder;
-
- private final Map tasks = Maps.newHashMap();
+ private final HttpClient httpClient;
+ private final AtomicBoolean started = new AtomicBoolean(false);
+ private final AtomicBoolean stopped = new AtomicBoolean(false);
+ private final ConcurrentMap tasks = new ConcurrentHashMap<>();
+ protected final ListeningExecutorService exec; // protected for unit tests
@Inject
public ForkingTaskRunner(
@@ -90,276 +137,626 @@ public ForkingTaskRunner(
Properties props,
TaskLogPusher taskLogPusher,
ObjectMapper jsonMapper,
- @Self DruidNode node
+ @Self DruidNode node,
+ @Global HttpClient httpClient
)
{
this.config = config;
this.taskConfig = taskConfig;
+ this.workerConfig = workerConfig;
this.props = props;
this.taskLogPusher = taskLogPusher;
this.jsonMapper = jsonMapper;
this.node = node;
this.portFinder = new PortFinder(config.getStartPort());
+ this.httpClient = httpClient;
+ this.exec = MoreExecutors.listeningDecorator(
+ Execs.multiThreaded(
+ workerConfig.getCapacity(),
+ "ForkingTaskWatcherExecutor-%d"
+ )
+ );
+ }
+
+ private ListenableFuture attach(
+ final String taskId,
+ final CountDownLatch leaderLatch,
+ ListeningExecutorService exec
+ )
+ {
+ final AtomicReference workItemAtomicReference = new AtomicReference<>(null);
+ final ListenableFuture future = exec.submit(
+ new Callable()
+ {
+ @Override
+ public TaskStatus call() throws Exception
+ {
+ if (leaderLatch != null) {
+ leaderLatch.await();
+ }
+ final ForkingTaskRunnerWorkItem workItem = tasks.get(taskId);
+ if (workItem == null) {
+ throw new NullPointerException(String.format("Task [%s] not found", taskId));
+ }
+ workItemAtomicReference.set(workItem);
+ final ProcessHolder processHolder = workItem.processHolder.get();
+ if (processHolder == null) {
+ throw new NullPointerException(String.format("Task [%s] has no process holder, cannot attach!", taskId));
+ }
+
+ processHolder.awaitShutdown(Long.MAX_VALUE);
+ final File statusFile = new File(
+ getTaskAttemptDir(processHolder.taskId, processHolder.attemptId),
+ STATUS_FILE_NAME
+ );
+ if (statusFile.exists() && statusFile.length() > 0) {
+ final TaskStatus status = jsonMapper.readValue(statusFile, TaskStatus.class);
+ log.info("Task [%s] exited with status [%s]", processHolder.taskId, status);
+ return TaskStatus.fromCode(processHolder.taskId, status.getStatusCode());
+ } else {
+ log.warn("Unable to find status file at [%s]. Reporting as failed", statusFile);
+ return TaskStatus.failure(processHolder.taskId);
+ }
+ }
+ }
+ );
+ Futures.addCallback(
+ future,
+ new FutureCallback()
+ {
+ @Override
+ public void onSuccess(TaskStatus result)
+ {
+ // Success of retrieving task status, not success of task
+ final ForkingTaskRunnerWorkItem workItem = workItemAtomicReference.get();
+ final ProcessHolder processHolder = workItem.processHolder.get();
+ uploadLogAndCleanDir(taskId, processHolder.attemptId);
+ portFinder.markPortUnused(processHolder.port);
+ if (!tasks.remove(taskId, workItem)) {
+ log.error("Task state corrupted, work items did not match for [%s] when cleaning up", taskId);
+ }
+ }
- this.exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(workerConfig.getCapacity()));
+ @Override
+ public void onFailure(Throwable t)
+ {
+ final ForkingTaskRunnerWorkItem workItem = workItemAtomicReference.get();
+ if (workItem == null) {
+ if (t instanceof CancellationException) {
+ log.debug("Task [%s] did not have work item set. Probably didn't win leader election", taskId);
+ } else {
+ log.error(t, "Error in attaching to task [%s]", taskId);
+ }
+ return;
+ }
+ final ProcessHolder processHolder = workItem.processHolder.get();
+ if (processHolder == null) {
+ log.error("Task [%s] has no process holder, cannot attach!", taskId);
+ return;
+ }
+ try {
+ portFinder.markPortUnused(processHolder.port);
+ if(t instanceof InterruptedException) {
+ log.info("Task watcher for [%s] was interrupted", processHolder);
+ } else {
+ log.error(t, "Task watcher for [%s] had an error on attaching", processHolder);
+ }
+ }
+ finally {
+ if (!tasks.remove(processHolder.taskId, workItem)) {
+ log.warn("work item didn't match entry in tasks for [%s]", processHolder.taskId);
+ }
+ }
+ }
+ },
+ exec
+ );
+ return future;
+ }
+
+ private void uploadLogAndCleanDir(String taskId, String attemptId)
+ {
+ final File taskAttemptDir = getTaskAttemptDir(taskId, attemptId);
+ final File taskDir = getTaskDir(taskId);
+ final File logFile = getLogFile(taskId, attemptId);
+ // Success of retrieving TaskStatus, not success of task
+ try {
+ taskLogPusher.pushTaskLog(taskId, logFile);
+ int remainingTries = MAX_DELETE_RETRIES;
+ while (taskAttemptDir.exists() && remainingTries-- > 0) {
+ try {
+ FileUtils.deleteDirectory(taskAttemptDir);
+ log.debug("Cleaned up [%s]", taskAttemptDir);
+ }
+ // IOException on race condition on deleting dir, IAE if dir is eliminated between exists check and deleteDirectory's exists check
+ catch (IOException | IllegalArgumentException ex) {
+ log.debug(ex, "Error cleaning up files at [%s]", taskAttemptDir);
+ }
+ }
+ if (taskAttemptDir.exists()) {
+ log.error("Could not cleanup directory [%s]", taskAttemptDir);
+ }
+ if (!taskDir.delete()) {
+ log.debug("Could not clear task directory [%s]", taskDir);
+ }
+ }
+ catch (IOException ex) {
+ log.error(ex, "Error pushing log file [%s]", logFile);
+ }
}
@Override
public ListenableFuture run(final Task task)
{
- synchronized (tasks) {
- if (!tasks.containsKey(task.getId())) {
- tasks.put(
- task.getId(),
- new ForkingTaskRunnerWorkItem(
- task.getId(),
- exec.submit(
- new Callable()
- {
- @Override
- public TaskStatus call()
- {
- final String attemptUUID = UUID.randomUUID().toString();
- final File taskDir = new File(taskConfig.getBaseTaskDir(), task.getId());
- final File attemptDir = new File(taskDir, attemptUUID);
-
- final ProcessHolder processHolder;
- final int childPort = portFinder.findUnusedPort();
- try {
- final Closer closer = Closer.create();
- try {
- if (!attemptDir.mkdirs()) {
- throw new IOException(String.format("Could not create directories: %s", attemptDir));
- }
-
- final File taskFile = new File(attemptDir, "task.json");
- final File statusFile = new File(attemptDir, "status.json");
- final File logFile = new File(attemptDir, "log");
-
- // time to adjust process holders
- synchronized (tasks) {
- final ForkingTaskRunnerWorkItem taskWorkItem = tasks.get(task.getId());
-
- if (taskWorkItem.shutdown) {
- throw new IllegalStateException("Task has been shut down!");
- }
-
- if (taskWorkItem == null) {
- log.makeAlert("WTF?! TaskInfo disappeared!").addData("task", task.getId()).emit();
- throw new ISE("TaskInfo disappeared for task[%s]!", task.getId());
- }
-
- if (taskWorkItem.processHolder != null) {
- log.makeAlert("WTF?! TaskInfo already has a processHolder")
- .addData("task", task.getId())
- .emit();
- throw new ISE("TaskInfo already has processHolder for task[%s]!", task.getId());
- }
-
- final List command = Lists.newArrayList();
- final String childHost = node.getHost();
- final String taskClasspath;
- if (task.getClasspathPrefix() != null && !task.getClasspathPrefix().isEmpty()) {
- taskClasspath = Joiner.on(File.pathSeparator).join(
- task.getClasspathPrefix(),
- config.getClasspath()
- );
- } else {
- taskClasspath = config.getClasspath();
- }
-
- command.add(config.getJavaCommand());
- command.add("-cp");
- command.add(taskClasspath);
-
- Iterables.addAll(command, whiteSpaceSplitter.split(config.getJavaOpts()));
-
- for (String propName : props.stringPropertyNames()) {
- for (String allowedPrefix : config.getAllowedPrefixes()) {
- if (propName.startsWith(allowedPrefix)) {
- command.add(
- String.format(
- "-D%s=%s",
- propName,
- props.getProperty(propName)
- )
- );
- }
- }
- }
-
- // Override child JVM specific properties
- for (String propName : props.stringPropertyNames()) {
- if (propName.startsWith(CHILD_PROPERTY_PREFIX)) {
- command.add(
- String.format(
- "-D%s=%s",
- propName.substring(CHILD_PROPERTY_PREFIX.length()),
- props.getProperty(propName)
- )
- );
- }
- }
-
- command.add(String.format("-Ddruid.host=%s", childHost));
- command.add(String.format("-Ddruid.port=%d", childPort));
-
- command.add("io.druid.cli.Main");
- command.add("internal");
- command.add("peon");
- command.add(taskFile.toString());
- command.add(statusFile.toString());
- String nodeType = task.getNodeType();
- if (nodeType != null) {
- command.add("--nodeType");
- command.add(nodeType);
- }
-
- jsonMapper.writeValue(taskFile, task);
-
- log.info("Running command: %s", Joiner.on(" ").join(command));
- taskWorkItem.processHolder = new ProcessHolder(
- new ProcessBuilder(ImmutableList.copyOf(command)).redirectErrorStream(true).start(),
- logFile,
- childPort
- );
-
- processHolder = taskWorkItem.processHolder;
- processHolder.registerWithCloser(closer);
- }
-
- log.info("Logging task %s output to: %s", task.getId(), logFile);
- boolean runFailed = true;
-
- try (final OutputStream toLogfile = Files.asByteSink(logFile).openBufferedStream()) {
- ByteStreams.copy(processHolder.process.getInputStream(), toLogfile);
- final int statusCode = processHolder.process.waitFor();
- log.info("Process exited with status[%d] for task: %s", statusCode, task.getId());
- if (statusCode == 0) {
- runFailed = false;
- }
- }
- finally {
- // Upload task logs
- taskLogPusher.pushTaskLog(task.getId(), logFile);
- }
-
- if (!runFailed) {
- // Process exited successfully
- return jsonMapper.readValue(statusFile, TaskStatus.class);
- } else {
- // Process exited unsuccessfully
- return TaskStatus.failure(task.getId());
- }
- } catch (Throwable t) {
- throw closer.rethrow(t);
- } finally {
- closer.close();
- }
- }
- catch (Throwable t) {
- log.info(t, "Exception caught during execution");
- throw Throwables.propagate(t);
- }
- finally {
- try {
- synchronized (tasks) {
- final ForkingTaskRunnerWorkItem taskWorkItem = tasks.remove(task.getId());
- if (taskWorkItem != null && taskWorkItem.processHolder != null) {
- taskWorkItem.processHolder.process.destroy();
- }
- }
- portFinder.markPortUnused(childPort);
- log.info("Removing temporary directory: %s", attemptDir);
- FileUtils.deleteDirectory(attemptDir);
- }
- catch (Exception e) {
- log.error(e, "Suppressing exception caught while cleaning up task");
- }
- }
- }
- }
- )
- )
- );
+ final CountDownLatch leaderLatch = new CountDownLatch(1);
+
+ // Submit a new task which will launch the job, then wait on an attach to the job
+ final ListenableFuture startingFuture = exec.submit(
+ new Callable()
+ {
+ @Override
+ public TaskStatus call()
+ {
+ try {
+ leaderLatch.await();
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw Throwables.propagate(e);
+ }
+ final int childPort = portFinder.findUnusedPort();
+ final File attemptDir = getNewTaskAttemptDir(task.getId());
+ final String attemptUUID = attemptDir.getName();
+ final ProcessHolder processHolder = new ProcessHolder(task.getId(), attemptUUID, childPort);
+ final Path attemptPath = attemptDir.toPath();
+
+ try {
+ final File taskFile = new File(attemptDir, TASK_FILE_NAME);
+ final File statusFile = new File(attemptDir, STATUS_FILE_NAME);
+ final File logFile = new File(attemptDir, LOG_FILE_NAME);
+ final File portFile = new File(attemptDir, PORT_FILE_NAME);
+
+ if (!taskFile.exists() && !taskFile.createNewFile()) {
+ throw new IOException(String.format("Could not create file [%s]", taskFile));
+ }
+ if (!statusFile.exists() && !statusFile.createNewFile()) {
+ throw new IOException(String.format("Could not create file [%s]", statusFile));
+ }
+ if (!logFile.exists() && !logFile.createNewFile()) {
+ throw new IOException(String.format("Could not create file [%s]", logFile));
+ }
+
+ // time to adjust process holders
+ final ForkingTaskRunnerWorkItem taskWorkItem = tasks.get(task.getId());
+
+ if (taskWorkItem == null) {
+ log.makeAlert("WTF?! TaskInfo disappeared!").addData("task", task.getId()).emit();
+ throw new ISE("TaskInfo disappeared for task[%s]!", task.getId());
+ }
+
+ if (taskWorkItem.shutdown.get()) {
+ throw new IllegalStateException("Task has been shut down!");
+ }
+
+ if (taskWorkItem.processHolder.get() != null) {
+ // Fail early, there is also a last second check later on
+ log.makeAlert("WTF?! TaskInfo already has a processHolder")
+ .addData("task", task.getId())
+ .emit();
+ throw new ISE("TaskInfo already has processHolder for task[%s]!", task.getId());
+ }
+
+ final List command = Lists.newArrayList();
+ final String childHost = node.getHost();
+ final String taskClasspath;
+ if (task.getClasspathPrefix() != null && !task.getClasspathPrefix().isEmpty()) {
+ taskClasspath = Joiner.on(File.pathSeparator).join(
+ task.getClasspathPrefix(),
+ config.getClasspath()
+ );
+ } else {
+ taskClasspath = config.getClasspath();
+ }
+
+ command.add(config.getJavaCommand());
+ command.add("-cp");
+ command.add(taskClasspath);
+
+ Iterables.addAll(command, whiteSpaceSplitter.split(config.getJavaOpts()));
+
+ for (String propName : props.stringPropertyNames()) {
+ for (String allowedPrefix : config.getAllowedPrefixes()) {
+ if (propName.startsWith(allowedPrefix)) {
+ command.add(
+ String.format(
+ "-D%s=%s",
+ propName,
+ props.getProperty(propName)
+ )
+ );
+ }
+ }
+ }
+
+ // Override child JVM specific properties
+ for (String propName : props.stringPropertyNames()) {
+ if (propName.startsWith(CHILD_PROPERTY_PREFIX)) {
+ command.add(
+ String.format(
+ "-D%s=%s",
+ propName.substring(CHILD_PROPERTY_PREFIX.length()),
+ props.getProperty(propName)
+ )
+ );
+ }
+ }
+
+ command.add(String.format("-Ddruid.host=%s", childHost));
+ command.add(String.format("-Ddruid.port=%d", childPort));
+
+ command.add(String.format("-D" + TASKID_PROPERTY + "=%s", task.getId()));
+
+ command.add("io.druid.cli.Main");
+ command.add("internal");
+ command.add("peon");
+ command.add(taskFile.toString());
+ command.add(statusFile.toString());
+ command.add(portFile.toString());
+ String nodeType = task.getNodeType();
+ if (nodeType != null) {
+ command.add("--nodeType");
+ command.add(nodeType);
+ }
+
+ jsonMapper.writeValue(taskFile, task);
+ try (WatchService watchService = attemptPath.getFileSystem().newWatchService()) {
+ attemptPath.register(
+ watchService,
+ StandardWatchEventKinds.ENTRY_CREATE,
+ StandardWatchEventKinds.ENTRY_MODIFY
+ );
+ log.info("Running command: %s", Joiner.on(" ").join(command));
+ // Process can continue running in the background. We monitor via files rather than Process
+ final Process process = new ProcessBuilder(command)
+ .redirectError(logFile)
+ .redirectOutput(logFile)
+ .start();
+ if (!taskWorkItem.processHolder.compareAndSet(null, processHolder)) {
+ final String msg = String.format(
+ "WTF!? Expected empty process holder and found [%s]",
+ taskWorkItem.processHolder.get()
+ );
+ log.makeAlert("%s", msg).emit();
+ throw new ISE("%s", msg);
+ }
+
+ log.info("Logging task %s output to: %s", task.getId(), logFile);
+ // Wait for files to be modified by task starting
+ log.debug("Waiting for task [%s] to start", processHolder);
+ watchService.take();// Should only be modified by task
+ log.debug("Waiting for task [%s] to finish", processHolder);
+ return attach(task.getId(), leaderLatch, MoreExecutors.sameThreadExecutor()).get();
+ }
+ }
+ catch(InterruptedException e)
+ {
+ log.info("Interrupted while waiting for task to start [%s]", processHolder);
+ Thread.currentThread().interrupt();
+ throw Throwables.propagate(e);
+ }
+ catch(ExecutionException e){
+ final Throwable eCause = e.getCause();
+ if(eCause instanceof InterruptedException){
+ log.info(e, "Attach interrupted for [%s]", processHolder);
+ Thread.currentThread().interrupt();
+ } else {
+ log.info(e, "Exception during execution of attach for [%s]", processHolder);
+ }
+ throw Throwables.propagate(e);
+ }
+ catch (Throwable t) {
+ log.info(t, "Exception caught during forking of [%s]", processHolder);
+ throw Throwables.propagate(t);
+ }
+ }
+ }
+ );
+ try {
+ final ForkingTaskRunnerWorkItem workItem = new ForkingTaskRunnerWorkItem(task.getId(), startingFuture);
+ // Leader election for task id
+ final ForkingTaskRunnerWorkItem leaderItem = tasks.putIfAbsent(task.getId(), workItem);
+ if (leaderItem != null) {
+ startingFuture.cancel(true);
+ log.warn("Already have task id [%s], returning prior task instead", task.getId());
+ return leaderItem.getResult();
+ } else {
+ return workItem.getResult();
}
+ }
+ finally {
+ leaderLatch.countDown();
+ }
+ }
- return tasks.get(task.getId()).getResult();
+
+ // This assumes that no task directories can be created except in the ForkingTaskRunner,
+ // And that the ForkingTaskRunner has exclusive ownership of the directory structure
+ @LifecycleStart
+ public synchronized void start()
+ {
+ if (stopped.get()) {
+ throw new ISE("Already stopped!");
+ }
+
+ populateMissingTasksFromDir();
+
+ if (!started.compareAndSet(false, true)) {
+ throw new ISE("Already started");
}
}
@LifecycleStop
- public void stop()
+ public synchronized void stop()
{
- synchronized (tasks) {
- exec.shutdown();
-
- for (ForkingTaskRunnerWorkItem taskWorkItem : tasks.values()) {
- if (taskWorkItem.processHolder != null) {
- log.info("Destroying process: %s", taskWorkItem.processHolder.process);
- taskWorkItem.processHolder.process.destroy();
- }
- }
+ if (!started.get()) {
+ throw new ISE("Not started");
+ }
+ if (!stopped.compareAndSet(false, true)) {
+ throw new ISE("Already stopped");
}
+ exec.shutdown();
}
@Override
public void shutdown(final String taskid)
{
- final ForkingTaskRunnerWorkItem taskInfo;
+ final ForkingTaskRunnerWorkItem taskInfo = tasks.get(taskid);
+
+ if (taskInfo == null) {
+ log.info("Ignoring request to cancel unknown task: %s", taskid);
+ return;
+ }
- synchronized (tasks) {
- taskInfo = tasks.get(taskid);
+ if (!taskInfo.shutdown.compareAndSet(false, true)) {
+ log.warn("Someone already shut down task [%s]. Ignoring request", taskid);
+ return;
+ }
- if (taskInfo == null) {
- log.info("Ignoring request to cancel unknown task: %s", taskid);
- return;
- }
- taskInfo.shutdown = true;
+ final ProcessHolder processHolder = taskInfo.processHolder.get();
+ if (processHolder == null) {
+ log.wtf("Task has no process holder!?");
+ return;
}
+ // Check to see if foreign process needs to be killed
+ if (processHolder.taskPortFile.exists()) {
+ try {
+ log.info("Killing task [%s] attempt [%s]", taskid, processHolder.attemptId);
+ final URL url = new URL("http", "localhost", processHolder.port, "/shutdown");
+ httpClient.go(
+ new Request(HttpMethod.DELETE, url),
+ new HttpResponseHandler()
+ {
+ @Override
+ public ClientResponse handleResponse(HttpResponse response)
+ {
+ HttpResponseStatus status = response.getStatus();
+ log.debug(
+ "Received status code %d [%s] for shutdown request",
+ status.getCode(),
+ status.getReasonPhrase()
+ );
+ if (status.getCode() != HttpResponseStatus.ACCEPTED.getCode()) {
+ final String msg = String.format(
+ "Bad status code. Received [%d]:[%s] from url [%s]",
+ status.getCode(),
+ status.getReasonPhrase(),
+ url
+ );
+ throw new RuntimeException(msg);
+ }
+ return ClientResponse.finished(null);
+ }
- if (taskInfo.processHolder != null) {
- // Will trigger normal failure mechanisms due to process exit
- log.info("Killing process for task: %s", taskid);
- taskInfo.processHolder.process.destroy();
+ @Override
+ public ClientResponse handleChunk(
+ ClientResponse clientResponse, HttpChunk chunk
+ )
+ {
+ log.info("Received chunk... why?");
+ return clientResponse;
+ }
+
+ @Override
+ public ClientResponse done(ClientResponse clientResponse)
+ {
+ return clientResponse;
+ }
+
+ @Override
+ public void exceptionCaught(ClientResponse clientResponse, Throwable e)
+ {
+ log.error(e, "Error in command execution");
+ }
+ }
+ ).get();
+ try {
+ processHolder.awaitShutdown(config.getSoftShutdownTimelimit());
+ }
+ catch (TimeoutException e) {
+ log.info(
+ "Timed out waiting for clean shutdown on task [%s]. Forcing shutdown...",
+ taskInfo.processHolder.get()
+ );
+ if (!forceKill(processHolder)) {
+ if (processHolder.taskPortFile.exists()) {
+ throw new RuntimeException("Unable to shutdown task!");
+ } else {
+ log.info("Task shutdown on its own");
+ }
+ }
+ }
+ taskInfo.getResult().get();
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw Throwables.propagate(e);
+ }
+ catch (MalformedURLException | ExecutionException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+ }
+
+ /**
+ * Unix (and maybe Oracle VM) specific killer for processes
+ *
+ * @param processHolder The process holder of interest
+ *
+ * @return True if the task was killed via this method, false otherwise.
+ *
+ * @throws InterruptedException If the waiting on system `kill` commands is interrupted.
+ */
+ private boolean forceKill(final ProcessHolder processHolder) throws InterruptedException
+ {
+ final String portString = String.format("%d", processHolder.port);
+ final List vms = ImmutableList.copyOf(
+ Collections2.filter(
+ VirtualMachine.list(),
+ new Predicate()
+ {
+ @Override
+ public boolean apply(VirtualMachineDescriptor input)
+ {
+ try {
+ return portString.equals(
+ input.provider()
+ .attachVirtualMachine(input)
+ .getSystemProperties()
+ .getProperty("druid.port")
+ );
+ }
+ catch (IOException | AttachNotSupportedException e) {
+ log.warn(e, "Could not read property from vm");
+ return false;
+ }
+ }
+ }
+ )
+ );
+ if (vms.isEmpty()) {
+ log.warn("Could not find vm for taskid [%s] using port [%d]!", processHolder.taskId, processHolder.port);
+ return false;
+ }
+ final VirtualMachineDescriptor vmd = vms.get(0);
+ try {
+ final int pid = Integer.parseInt(vmd.id());
+ log.info("Forcing kill of task [%s] on pid [%d]", processHolder.taskId, pid);
+ }
+ catch (NumberFormatException e) {
+ log.error("Could not find pid for task [%s]. VM id [%s] is not an integer", processHolder.taskId, vmd.id());
+ return false;
+ }
+ try {
+ Process killingProcess = new ProcessBuilder(ImmutableList.of("kill", "-15", vmd.id()))
+ .redirectOutput(ProcessBuilder.Redirect.PIPE)
+ .redirectError(ProcessBuilder.Redirect.PIPE)
+ .start();
+ int retval = killingProcess.waitFor();
+ if (retval == 0) {
+ processHolder.awaitShutdown(config.getSoftShutdownTimelimit());
+ return true;
+ }
+ try (InputStream inputStream = killingProcess.getInputStream()) {
+ Scanner scanner = new Scanner(inputStream).useDelimiter("\\A");
+ log.error(
+ "Term of pid [%s] did not succeed with code [%d]: [%s]",
+ vmd.id(),
+ retval,
+ scanner.hasNext() ? scanner.next() : "null"
+ );
+ }
+ killingProcess = new ProcessBuilder(ImmutableList.of("kill", "-9", vmd.id()))
+ .redirectOutput(ProcessBuilder.Redirect.PIPE)
+ .redirectError(ProcessBuilder.Redirect.PIPE)
+ .start();
+ retval = killingProcess.waitFor();
+ if (retval == 0) {
+ processHolder.awaitShutdown(config.getSoftShutdownTimelimit());
+ return true;
+ }
+ try (InputStream inputStream = killingProcess.getInputStream()) {
+ Scanner scanner = new Scanner(inputStream).useDelimiter("\\A");
+ log.error(
+ "Kill of pid [%s] did not succeed with code [%d]: [%s]",
+ vmd.id(),
+ retval,
+ scanner.hasNext() ? scanner.next() : "null"
+ );
+ }
+ return false;
+ }
+ catch (IOException | TimeoutException e) {
+ throw Throwables.propagate(e);
}
}
@Override
public Collection getRunningTasks()
{
- synchronized (tasks) {
- final List ret = Lists.newArrayList();
- for (final ForkingTaskRunnerWorkItem taskWorkItem : tasks.values()) {
- if (taskWorkItem.processHolder != null) {
- ret.add(taskWorkItem);
+ return Collections2.transform(
+ Collections2.filter(
+ tasks.values(),
+ new Predicate()
+ {
+ @Override
+ public boolean apply(ForkingTaskRunnerWorkItem input)
+ {
+ return input.processHolder.get() != null;
+ }
+ }
+ ),
+ new Function()
+ {
+ @Override
+ public TaskRunnerWorkItem apply(ForkingTaskRunnerWorkItem input)
+ {
+ return input;
+ }
}
- }
- return ret;
- }
+ );
}
@Override
public Collection getPendingTasks()
{
- synchronized (tasks) {
- final List ret = Lists.newArrayList();
- for (final ForkingTaskRunnerWorkItem taskWorkItem : tasks.values()) {
- if (taskWorkItem.processHolder == null) {
- ret.add(taskWorkItem);
+ return Collections2.transform(
+ Collections2.filter(
+ tasks.values(),
+ new Predicate()
+ {
+ @Override
+ public boolean apply(ForkingTaskRunnerWorkItem input)
+ {
+ return input.processHolder.get() == null;
+ }
+ }
+ ),
+ new Function()
+ {
+ @Override
+ public TaskRunnerWorkItem apply(ForkingTaskRunnerWorkItem input)
+ {
+ return input;
+ }
}
- }
- return ret;
- }
+ );
}
@Override
public Collection getKnownTasks()
{
- synchronized (tasks) {
- return Lists.newArrayList(tasks.values());
- }
+ return ImmutableList.copyOf(tasks.values());
}
@Override
@@ -371,15 +768,22 @@ public Collection getWorkers()
@Override
public Optional streamTaskLog(final String taskid, final long offset)
{
- final ProcessHolder processHolder;
+ final ForkingTaskRunnerWorkItem taskWorkItem = tasks.get(taskid);
- synchronized (tasks) {
- final ForkingTaskRunnerWorkItem taskWorkItem = tasks.get(taskid);
- if (taskWorkItem != null && taskWorkItem.processHolder != null) {
- processHolder = taskWorkItem.processHolder;
- } else {
- return Optional.absent();
- }
+ if (taskWorkItem == null) {
+ return Optional.absent();
+ }
+
+ final ProcessHolder processHolder = taskWorkItem.processHolder.get();
+
+ if (processHolder == null) {
+ return Optional.absent();
+ }
+
+ final File logFile = getLogFile(processHolder);
+
+ if (!logFile.exists()) {
+ return Optional.absent();
}
return Optional.of(
@@ -388,16 +792,298 @@ public Optional streamTaskLog(final String taskid, final long offset
@Override
public InputStream openStream() throws IOException
{
- return LogUtils.streamFile(processHolder.logFile, offset);
+ return LogUtils.streamFile(logFile, offset);
}
}
);
}
+ private void populateMissingTasksFromDir()
+ {
+ if (started.get() || stopped.get()) {
+ // This might be safe to do, but this method assumes this is not the case
+ throw new ISE("Cannot populate tasks from dirs once ForkingTaskRunner has been started");
+ }
+ //------------------------------------------------------------------ Find attempt directories
+ final File baseDir = taskConfig.getBaseTaskDir();
+ final File[] taskDirFileArray = baseDir.listFiles();
+ final Collection taskDirFileList = // Only directories
+ taskDirFileArray == null ?
+ ImmutableList.of() :
+ Collections2.filter(
+ Arrays.asList(
+ taskDirFileArray
+ ), new Predicate()
+ {
+ @Override
+ public boolean apply(File input)
+ {
+ return input.exists() && input.isDirectory();
+ }
+ }
+ );
+ if (taskDirFileList.isEmpty()) {
+ log.info("No task dirs found in [%s]", baseDir);
+ return;
+ }
+
+ // For the task directories, look for attempt directories
+ for (File potentialTaskDir : taskDirFileList) {
+ final File[] taskAttemptDirFileArray = potentialTaskDir.listFiles();
+ final Collection taskAttemptDirFileList = // Only directories containing a non-zero TASK_FILE_NAME file
+ taskAttemptDirFileArray == null ?
+ ImmutableList.of() :
+ Collections2.filter(
+ Arrays.asList(taskAttemptDirFileArray),
+ new Predicate()
+ {
+ @Override
+ public boolean apply(File input)
+ {
+ return input.isDirectory() && input.listFiles(
+ new FilenameFilter()
+ {
+ @Override
+ public boolean accept(File dir, String name)
+ {
+ return TASK_FILE_NAME.equals(name)
+ && new File(dir, TASK_FILE_NAME).length()
+ > 0;
+ }
+ }
+ ) != null; // Did we find any?
+ }
+ }
+ );
+ if (taskAttemptDirFileList.isEmpty()) {
+ log.info("Directory [%s] has no viable task attempts, attempting to cleanup if empty", potentialTaskDir);
+ if (!potentialTaskDir.delete()) {
+ log.warn("Could not clean up [%s]", potentialTaskDir);
+ }
+ continue;
+ }
+ // Find latest attempt in directory
+ long last_attempt = 0;
+ File latestAttemptDir = null;
+ for (File taskAttemptDir : taskAttemptDirFileList) {
+ if (!taskAttemptDir.isDirectory()) {
+ log.debug("Skipping non-directory [%s]", taskAttemptDir);
+ continue;
+ }
+ try {
+ final long check_attempt = Long.parseLong(taskAttemptDir.getName());
+ if (check_attempt > last_attempt) {
+ latestAttemptDir = getTaskAttemptDir(taskAttemptDir.getName(), check_attempt);
+ last_attempt = check_attempt;
+ }
+ }
+ catch (NumberFormatException e) {
+ log.debug(e, "Skipping unparsable directory [%s]", taskAttemptDir);
+ }
+ if (latestAttemptDir == null) {
+ latestAttemptDir = taskAttemptDir;
+ } else {
+ if (latestAttemptDir.lastModified() < taskAttemptDir.lastModified()) {
+ latestAttemptDir = taskAttemptDir;
+ }
+ }
+ }
+ if (latestAttemptDir == null) {
+ log.wtf("I had directories in [%s] but now I don't... memory corruption?", potentialTaskDir);
+ continue;
+ }
+
+ //------------------------------------------------------------------ Load up data from suspected good attempt dirs
+
+ // We already checked earlier that this exists and is non zero
+ final File taskFile = new File(latestAttemptDir, TASK_FILE_NAME);
+ final Task task;
+ try {
+ task = jsonMapper.readValue(taskFile, Task.class);
+ }
+ catch (IOException e) {
+ log.makeAlert(e, "Corrupted task file at [%s]", taskFile).emit();
+ continue;
+ }
+
+ final File statusFile = new File(latestAttemptDir, STATUS_FILE_NAME);
+
+ if (!statusFile.exists()) {
+ // Shouldn't be missing unless there's corruption somehow.
+ log.makeAlert("Status file [%s] is missing ", statusFile).emit();
+ continue;
+ }
+
+ final File portFile = new File(latestAttemptDir, PORT_FILE_NAME);
+ Integer port = null;
+ if (portFile.exists()) {
+ // Ooookkkkk, so here's where it gets interesting. The PORT_FILE_NAME is the intended indicator for the
+ // JVM instance itself. Only the PEON is in charge of writing or deleting this file.
+ // As such, there are a number of error cases about concurrent modifications that are taken into account here.
+ // For example, a task is forked but then the forking task runner dies or is stopped. What do you do if the
+ // Forking Task Runner and peon are starting up at the same time?
+ try (FileChannel portFileChannel = FileChannel.open(
+ portFile.toPath(),
+ StandardOpenOption.READ,
+ StandardOpenOption.WRITE // Required for lock
+ )) {
+ final ByteBuffer buffer;
+ final FileLock fileLock = portFileChannel.lock(); // To make sure the peon is done writing before we try to read
+ try {
+ if (portFileChannel.size() > Integer.MAX_VALUE) {
+ // Probably should never happen
+ log.makeAlert(
+ "port file [%s] for task [%s] is HUGE %d bytes",
+ portFile,
+ task.getId(),
+ portFileChannel.size()
+ ).emit();
+ continue;
+ }
+ buffer = ByteBuffer.allocate((int) portFileChannel.size());
+ portFileChannel.read(buffer);
+ buffer.rewind();
+ }
+ finally {
+ fileLock.release();
+ }
+ final String portString = Charsets.UTF_8.newDecoder().decode(buffer).toString();
+ port = Integer.parseInt(portString);
+ }
+ catch (FileNotFoundException e) {
+ log.info(e, "Task [%s] attempt [%s] exited during check", task.getId(), latestAttemptDir.getName());
+ port = null;
+ }
+ catch (IOException | NumberFormatException e) {
+ if (portFile.exists()) {
+ // Something went wrong during write of value from peon's side
+ log.makeAlert(e, "Port file [%s] for task [%s] is corrupt", portFile, task.getId()).emit();
+ continue;
+ }
+ // Exited during read
+ log.info(e, "Task [%s] attempt [%s] exited during read", task.getId(), latestAttemptDir.getName());
+ port = null;
+ }
+ }
+ if (port == null) {
+ // At this point there should be one of two scenarios:
+ // A) The peon has exited between ForkingTaskRunner instances
+ // B) The peon is still starting up and hasn't written the port file yet
+ log.debug("Found no port file for task [%d]. Uploading log and cleaning", task.getId());
+ final CountDownLatch doneLatch = new CountDownLatch(1);
+ try {
+ final ListenableFuture future;
+ final ForkingTaskRunnerWorkItem workItem;
+ try {
+ future = attach(task.getId(), doneLatch, exec);
+ workItem = new ForkingTaskRunnerWorkItem(task.getId(), future);
+ workItem.processHolder = new AtomicReference<>(
+ new ProcessHolder(
+ task.getId(),
+ latestAttemptDir.getName(),
+ 0
+ )
+ );
+ tasks.put(task.getId(), workItem);
+ }
+ finally {
+ doneLatch.countDown();
+ }
+ future.get();
+ workItem.processHolder.get().awaitShutdown(100L);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw Throwables.propagate(e);
+ }
+ catch (TimeoutException | ExecutionException e) {
+ log.makeAlert(e, "Could upload data for task [%s] which finished between runs", task.getId()).emit();
+ }
+ continue;
+ } else {
+ portFinder.markPortUsed(port);
+ }
+ final ProcessHolder processHolder = new ProcessHolder(task.getId(), latestAttemptDir.getName(), port);
+ final CountDownLatch leaderLatch = new CountDownLatch(1);
+ final ForkingTaskRunnerWorkItem workItem = new ForkingTaskRunnerWorkItem(
+ task.getId(),
+ attach(task.getId(), leaderLatch, exec)
+ );
+ try {
+ workItem.processHolder.set(processHolder);
+ if (tasks.putIfAbsent(task.getId(), workItem) != null) {
+ log.warn("Task [%s] already exists!", task.getId());
+ workItem.getResult().cancel(true);
+ } else {
+ log.info("Found task [%s] in progress", processHolder);
+ }
+ }
+ finally {
+ leaderLatch.countDown();
+ }
+ }
+ }
+
+ private File getTaskDir(String taskId)
+ {
+ return new File(taskConfig.getBaseTaskDir(), taskId);
+ }
+
+ private File getNewTaskAttemptDir(String taskId)
+ {
+ final File taskDir = getTaskDir(taskId);
+ if (!taskDir.exists()) {
+ taskDir.mkdirs();
+ }
+ final File[] files = taskDir.listFiles();
+ long attempt_num = 0;
+ if (files != null) {
+ for (File file : files) {
+ if (file.isDirectory()) {
+ try {
+ long attempt = Long.parseLong(file.getName());
+ if (attempt > attempt_num) {
+ attempt_num = attempt;
+ }
+ }
+ catch (NumberFormatException e) {
+ log.debug(e, "couldn't parse directory [%s]", file);
+ }
+ }
+ }
+ }
+ File file;
+ long attempt = attempt_num + 1;
+ do {
+ file = getTaskAttemptDir(taskId, attempt++);
+ } while (!file.mkdirs());
+ return file;
+ }
+
+ private File getTaskAttemptDir(String taskId, long attempt_num)
+ {
+ return new File(getTaskDir(taskId), String.format("%04d", attempt_num));
+ }
+
+ private File getTaskAttemptDir(String taskId, String attemptId)
+ {
+ return new File(getTaskDir(taskId), attemptId);
+ }
+
+ private File getLogFile(String taskId, String attemptId)
+ {
+ return new File(getTaskAttemptDir(taskId, attemptId), LOG_FILE_NAME);
+ }
+
+ private File getLogFile(ProcessHolder processHolder)
+ {
+ return getLogFile(processHolder.taskId, processHolder.attemptId);
+ }
+
private static class ForkingTaskRunnerWorkItem extends TaskRunnerWorkItem
{
- private volatile boolean shutdown = false;
- private volatile ProcessHolder processHolder = null;
+ private AtomicBoolean shutdown = new AtomicBoolean(false);
+ private AtomicReference processHolder = new AtomicReference<>(null);
private ForkingTaskRunnerWorkItem(
String taskId,
@@ -408,23 +1094,55 @@ private ForkingTaskRunnerWorkItem(
}
}
- private static class ProcessHolder
+ private class ProcessHolder
{
- private final Process process;
- private final File logFile;
+ private final String taskId;
+ private final String attemptId;
private final int port;
+ private final File taskPortFile;
- private ProcessHolder(Process process, File logFile, int port)
+ private ProcessHolder(String taskId, String attemptId, int port)
{
- this.process = process;
- this.logFile = logFile;
+ this.taskId = taskId;
+ this.attemptId = attemptId;
this.port = port;
+ taskPortFile = new File(getTaskAttemptDir(taskId, attemptId), PORT_FILE_NAME);
+ }
+
+ public void awaitShutdown(long timeoutMS) throws InterruptedException, TimeoutException
+ {
+ final long startTime = System.currentTimeMillis();
+ final Path taskPath = taskPortFile.toPath();
+ try (WatchService watchService = taskPath.getFileSystem().newWatchService()) {
+ taskPath.getParent().register(watchService, StandardWatchEventKinds.ENTRY_DELETE);
+ while (taskPortFile.exists()) {
+ final long delta = System.currentTimeMillis() - startTime;
+ if (timeoutMS <= delta) {
+ throw new TimeoutException("Waiting for the right delete event");
+ }
+ if (watchService.poll(timeoutMS - delta, TimeUnit.MILLISECONDS) == null) {
+ throw new TimeoutException("Waiting for delete event to register");
+ }
+ }
+ }
+ catch (IOException e) {
+ throw Throwables.propagate(e);
+ }
}
- private void registerWithCloser(Closer closer)
+ @Override
+ public String toString()
{
- closer.register(process.getInputStream());
- closer.register(process.getOutputStream());
+ return "ProcessHolder{" +
+ "taskId='" + taskId + '\'' +
+ ", attemptId='" + attemptId + '\'' +
+ ", port=" + port +
+ '}';
}
}
+
+ public boolean isStarted()
+ {
+ return started.get();
+ }
}
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunnerFactory.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunnerFactory.java
index 6183894092ed..6d9ed4b98e60 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunnerFactory.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunnerFactory.java
@@ -19,6 +19,8 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
+import com.metamx.http.client.HttpClient;
+import io.druid.guice.annotations.Global;
import io.druid.guice.annotations.Self;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.overlord.config.ForkingTaskRunnerConfig;
@@ -39,6 +41,7 @@ public class ForkingTaskRunnerFactory implements TaskRunnerFactory
private final ObjectMapper jsonMapper;
private final TaskLogPusher persistentTaskLogs;
private final DruidNode node;
+ private final HttpClient httpClient;
@Inject
public ForkingTaskRunnerFactory(
@@ -48,7 +51,8 @@ public ForkingTaskRunnerFactory(
final Properties props,
final ObjectMapper jsonMapper,
final TaskLogPusher persistentTaskLogs,
- @Self DruidNode node
+ @Self DruidNode node,
+ @Global HttpClient httpClient
) {
this.config = config;
this.taskConfig = taskConfig;
@@ -57,11 +61,12 @@ public ForkingTaskRunnerFactory(
this.jsonMapper = jsonMapper;
this.persistentTaskLogs = persistentTaskLogs;
this.node = node;
+ this.httpClient = httpClient;
}
@Override
public TaskRunner build()
{
- return new ForkingTaskRunner(config, taskConfig, workerConfig, props, persistentTaskLogs, jsonMapper, node);
+ return new ForkingTaskRunner(config, taskConfig, workerConfig, props, persistentTaskLogs, jsonMapper, node, httpClient);
}
}
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/PortFinder.java b/indexing-service/src/main/java/io/druid/indexing/overlord/PortFinder.java
index be3c1d7412d4..ac5516c5fd13 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/PortFinder.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/PortFinder.java
@@ -74,9 +74,28 @@ public synchronized int findUnusedPort()
return port;
}
- public synchronized void markPortUnused(int port)
+ /**
+ * Force a port to be thought of as used.
+ *
+ * @param port The port of interest
+ *
+ * @return Result of Set.add
+ */
+ public synchronized boolean markPortUsed(int port)
{
- usedPorts.remove(port);
+ return usedPorts.add(port);
+ }
+
+ /**
+ * Force removing a port from the list of used ports
+ *
+ * @param port The port to remove
+ *
+ * @return The result of Set.remove
+ */
+ public synchronized boolean markPortUnused(int port)
+ {
+ return usedPorts.remove(port);
}
private int chooseNext(int start)
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/config/ForkingTaskRunnerConfig.java b/indexing-service/src/main/java/io/druid/indexing/overlord/config/ForkingTaskRunnerConfig.java
index f35825b7cf27..46727ba87cfd 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/config/ForkingTaskRunnerConfig.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/config/ForkingTaskRunnerConfig.java
@@ -49,6 +49,14 @@ public class ForkingTaskRunnerConfig
@Max(65535)
private int startPort = 8100;
+
+ /**
+ * This is the time (in ms) that the forking task runner should allow the task to softly shutdown before trying to forcibly kill it.
+ */
+ @JsonProperty
+ @Min(0)
+ private long softShutdownTimelimit = 30_000;
+
@JsonProperty
@NotNull
List allowedPrefixes = Lists.newArrayList(
@@ -85,4 +93,15 @@ public List getAllowedPrefixes()
{
return allowedPrefixes;
}
+
+ public ForkingTaskRunnerConfig setSoftShutdownTimeLimit(@Min(0) long limit)
+ {
+ this.softShutdownTimelimit = limit;
+ return this;
+ }
+
+ public long getSoftShutdownTimelimit()
+ {
+ return softShutdownTimelimit;
+ }
}
diff --git a/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycle.java b/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycle.java
index 2850293231ed..11d794792fe8 100644
--- a/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycle.java
+++ b/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycle.java
@@ -18,9 +18,11 @@
package io.druid.indexing.worker.executor;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
+import com.google.common.io.ByteSink;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
@@ -29,14 +31,32 @@
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
import io.druid.concurrent.Execs;
+import io.druid.guice.annotations.Self;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.TaskRunner;
-
+import io.druid.server.DruidNode;
+
+import javax.management.InstanceAlreadyExistsException;
+import javax.management.MBeanRegistrationException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.management.ManagementFactory;
+import java.nio.CharBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CharsetEncoder;
+import java.nio.file.StandardOpenOption;
import java.util.concurrent.ExecutorService;
/**
@@ -51,8 +71,7 @@ public class ExecutorLifecycle
private final TaskActionClientFactory taskActionClientFactory;
private final TaskRunner taskRunner;
private final ObjectMapper jsonMapper;
-
- private final ExecutorService parentMonitorExec = Execs.singleThreaded("parent-monitor-%d");
+ private final DruidNode druidNode;
private volatile ListenableFuture statusFuture = null;
@@ -61,13 +80,15 @@ public ExecutorLifecycle(
ExecutorLifecycleConfig config,
TaskActionClientFactory taskActionClientFactory,
TaskRunner taskRunner,
- ObjectMapper jsonMapper
+ ObjectMapper jsonMapper,
+ @Self DruidNode druidNode
)
{
this.config = config;
this.taskActionClientFactory = taskActionClientFactory;
this.taskRunner = taskRunner;
this.jsonMapper = jsonMapper;
+ this.druidNode = druidNode;
}
@LifecycleStart
@@ -75,7 +96,21 @@ public void start()
{
final File taskFile = Preconditions.checkNotNull(config.getTaskFile(), "taskFile");
final File statusFile = Preconditions.checkNotNull(config.getStatusFile(), "statusFile");
- final InputStream parentStream = Preconditions.checkNotNull(config.getParentStream(), "parentStream");
+ final File portFile = Preconditions.checkNotNull(config.getPortFile(), "portFile");
+
+ try(final FileChannel portFileChannel = FileChannel.open(portFile.toPath(), StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE)) {
+ final String portStr = String.format("%d", druidNode.getPort());
+ final FileLock lock = portFileChannel.lock();
+ try {
+ CharsetEncoder encoder = Charsets.UTF_8.newEncoder();
+ portFileChannel.write(encoder.encode(CharBuffer.wrap(portStr)));
+ }finally{
+ lock.release();
+ }
+ }
+ catch (IOException e) {
+ throw Throwables.propagate(e);
+ }
final Task task;
@@ -91,30 +126,6 @@ public void start()
throw Throwables.propagate(e);
}
- // Spawn monitor thread to keep a watch on parent's stdin
- // If stdin reaches eof, the parent is gone, and we should shut down
- parentMonitorExec.submit(
- new Runnable()
- {
- @Override
- public void run()
- {
- try {
- while (parentStream.read() != -1) {
- // Toss the byte
- }
- }
- catch (Exception e) {
- log.error(e, "Failed to read from stdin");
- }
-
- // Kind of gross, but best way to kill the JVM as far as I know
- log.info("Triggering JVM shutdown.");
- System.exit(2);
- }
- }
- );
-
// Won't hurt in remote mode, and is required for setting up locks in local mode:
try {
if (!task.isReady(taskActionClientFactory.create(task))) {
@@ -166,6 +177,12 @@ public void join()
@LifecycleStop
public void stop()
{
- parentMonitorExec.shutdown();
+ final File portFile = Preconditions.checkNotNull(config.getPortFile(), "portFile");
+ if(!portFile.delete())
+ {
+ log.warn("Unable to delete task port file at [%s]", portFile.toString());
+ } else {
+ log.info("Deleted port file at [%s]", portFile.toString());
+ }
}
}
diff --git a/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycleConfig.java b/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycleConfig.java
index 0f87ca403cbb..a584f870f2d1 100644
--- a/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycleConfig.java
+++ b/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycleConfig.java
@@ -38,8 +38,9 @@ public class ExecutorLifecycleConfig
private File statusFile = null;
@JsonProperty
- @Pattern(regexp = "\\{stdin\\}")
- private String parentStreamName = "stdin";
+ @NotNull
+ private File portFile = null;
+
public File getTaskFile()
{
@@ -63,24 +64,14 @@ public ExecutorLifecycleConfig setStatusFile(File statusFile)
return this;
}
- public String getParentStreamName()
+ public File getPortFile()
{
- return parentStreamName;
+ return portFile;
}
- public ExecutorLifecycleConfig setParentStreamName(String parentStreamName)
+ public ExecutorLifecycleConfig setPortFile(File portFile)
{
- this.parentStreamName = parentStreamName;
+ this.portFile = portFile;
return this;
}
-
- public InputStream getParentStream()
- {
- if ("stdin".equals(parentStreamName)) {
- return System.in;
- }
- else {
- throw new ISE("Unknown stream name[%s]", parentStreamName);
- }
- }
}
diff --git a/server/src/main/java/io/druid/server/http/ShutdownResource.java b/server/src/main/java/io/druid/server/http/ShutdownResource.java
new file mode 100644
index 000000000000..4658669c4bc4
--- /dev/null
+++ b/server/src/main/java/io/druid/server/http/ShutdownResource.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.server.http;
+
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.inject.Inject;
+import com.metamx.common.lifecycle.Lifecycle;
+import com.metamx.common.logger.Logger;
+
+import javax.ws.rs.DELETE;
+import javax.ws.rs.Path;
+import javax.ws.rs.core.Response;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+@Path("/shutdown")
+public class ShutdownResource
+{
+ private final Lifecycle lifecycle;
+ @Inject
+ public ShutdownResource(
+ Lifecycle lifecycle
+ ){
+ this.lifecycle = lifecycle;
+ }
+ private static final Logger log = new Logger(ShutdownResource.class);
+ private final ListeningScheduledExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
+
+ @DELETE
+ public Response shutDown()
+ {
+ log.info("Received shutdown request");
+ executorService.schedule(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ System.exit(0);
+ }
+ },
+ 1,
+ TimeUnit.SECONDS
+ );
+ executorService.shutdown();
+ return Response.status(Response.Status.ACCEPTED).build();
+ }
+}
diff --git a/services/src/main/java/io/druid/cli/CliPeon.java b/services/src/main/java/io/druid/cli/CliPeon.java
index 4b5728b4c75d..251f5ee92f41 100644
--- a/services/src/main/java/io/druid/cli/CliPeon.java
+++ b/services/src/main/java/io/druid/cli/CliPeon.java
@@ -70,6 +70,7 @@
import io.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import io.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider;
import io.druid.server.QueryResource;
+import io.druid.server.http.ShutdownResource;
import io.druid.server.initialization.jetty.JettyServerInitializer;
import org.eclipse.jetty.server.Server;
@@ -86,8 +87,8 @@
)
public class CliPeon extends GuiceRunnable
{
- @Arguments(description = "task.json status.json", required = true)
- public List taskAndStatusFile;
+ @Arguments(description = "task.json status.json task.port", required = true)
+ public List taskAndStatusFiles;
@Option(name = "--nodeType", title = "nodeType", description = "Set the node type to expose on ZK")
public String nodeType = "indexer-executor";
@@ -125,7 +126,7 @@ public void configure(Binder binder)
handlerProviderBinder.addBinding("noop")
.to(NoopChatHandlerProvider.class).in(LazySingleton.class);
binder.bind(ServiceAnnouncingChatHandlerProvider.class).in(LazySingleton.class);
-
+
binder.bind(NoopChatHandlerProvider.class).in(LazySingleton.class);
binder.bind(TaskToolboxFactory.class).in(LazySingleton.class);
@@ -148,8 +149,9 @@ public void configure(Binder binder)
binder.bind(ExecutorLifecycle.class).in(ManageLifecycle.class);
binder.bind(ExecutorLifecycleConfig.class).toInstance(
new ExecutorLifecycleConfig()
- .setTaskFile(new File(taskAndStatusFile.get(0)))
- .setStatusFile(new File(taskAndStatusFile.get(1)))
+ .setTaskFile(new File(taskAndStatusFiles.get(0)))
+ .setStatusFile(new File(taskAndStatusFiles.get(1)))
+ .setPortFile(new File(taskAndStatusFiles.get(2)))
);
binder.bind(TaskRunner.class).to(ThreadPoolTaskRunner.class);
@@ -166,6 +168,7 @@ public void configure(Binder binder)
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class);
Jerseys.addResource(binder, QueryResource.class);
Jerseys.addResource(binder, ChatHandlerResource.class);
+ Jerseys.addResource(binder, ShutdownResource.class);
LifecycleModule.register(binder, QueryResource.class);
binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig(nodeType));
diff --git a/services/src/test/java/io/druid/indexing/overlord/autoscaling/ForkingTaskRunnerTest.java b/services/src/test/java/io/druid/indexing/overlord/autoscaling/ForkingTaskRunnerTest.java
new file mode 100644
index 000000000000..294f66fdb7b1
--- /dev/null
+++ b/services/src/test/java/io/druid/indexing/overlord/autoscaling/ForkingTaskRunnerTest.java
@@ -0,0 +1,461 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.overlord.autoscaling;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.ByteSink;
+import com.google.common.io.Closer;
+import com.google.common.io.FileBackedOutputStream;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.inject.Binder;
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.metamx.common.StringUtils;
+import com.metamx.common.logger.Logger;
+import com.metamx.http.client.HttpClient;
+import io.druid.guice.GuiceInjectors;
+import io.druid.guice.JsonConfigProvider;
+import io.druid.guice.annotations.Global;
+import io.druid.guice.annotations.Json;
+import io.druid.guice.annotations.Self;
+import io.druid.indexing.common.TaskStatus;
+import io.druid.indexing.common.TaskToolbox;
+import io.druid.indexing.common.actions.TaskActionClient;
+import io.druid.indexing.common.config.TaskConfig;
+import io.druid.indexing.common.task.AbstractTask;
+import io.druid.indexing.overlord.ForkingTaskRunner;
+import io.druid.indexing.overlord.config.ForkingTaskRunnerConfig;
+import io.druid.indexing.worker.config.WorkerConfig;
+import io.druid.initialization.DruidModule;
+import io.druid.initialization.Initialization;
+import io.druid.server.DruidNode;
+import io.druid.tasklogs.TaskLogPusher;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Path;
+import java.nio.file.StandardWatchEventKinds;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Ignore // Takes too long to run on Travis
+public class ForkingTaskRunnerTest
+{
+ private final Closer closer = Closer.create();
+ @Rule
+ final public TemporaryFolder temporaryFolder = new TemporaryFolder();
+ private String taskId;
+ private File taskBaseDir;
+ private File taskDir;
+ private ForkingTaskRunner forkingTaskRunner;
+ private static final Injector injector = Initialization.makeInjectorWithModules(
+ GuiceInjectors.makeStartupInjector(), ImmutableList.of(
+ new com.google.inject.Module()
+ {
+ @Override
+ public void configure(Binder binder)
+ {
+ JsonConfigProvider.bindInstance(
+ binder, Key.get(DruidNode.class, Self.class), new DruidNode("test", "localhost", null)
+ );
+ }
+ }
+ )
+ );
+ private HttpClient httpClient;
+ private File watchFile;
+ private ObjectMapper mapper;
+ private final AtomicInteger pushTaskLogCalls = new AtomicInteger(0);
+
+
+ private ForkingTaskRunner makeForkingTaskRunner(Integer timeout) throws IOException
+ {
+ final Properties properties = new Properties();
+ properties.setProperty("druid.processing.numThreads", "1");
+ final ForkingTaskRunnerConfig config = new ForkingTaskRunnerConfig();
+ if (timeout != null) {
+ config.setSoftShutdownTimeLimit(timeout);
+ }
+ return new ForkingTaskRunner(
+ config,
+ new TaskConfig(
+ temporaryFolder.newFolder().getAbsolutePath(),
+ taskBaseDir.getAbsolutePath(),
+ "/tmp",
+ null,
+ null
+ ),
+ new WorkerConfig(),
+ properties,
+ new TaskLogPusher()
+ {
+ @Override
+ public void pushTaskLog(String taskid, File logFile) throws IOException
+ {
+ pushTaskLogCalls.incrementAndGet();
+ }
+ },
+ mapper,
+ new DruidNode("test/service", "localhost", -1),
+ httpClient
+ )
+ {
+ @Override
+ public void stop()
+ {
+ super.stop();
+ // Since we don't kill JVM between unit test instances, we want to make sure futures are trashed.
+ exec.shutdownNow();
+ }
+ };
+ }
+
+ @Before
+ public void setUp() throws IOException
+ {
+ mapper = injector.getBinding(Key.get(ObjectMapper.class, Json.class)).getProvider().get();
+ watchFile = new File(temporaryFolder.newFolder(), "watchFile");
+ taskId = "BusyTaskID-" + UUID.randomUUID().toString();
+ taskBaseDir = temporaryFolder.newFolder();
+ taskDir = new File(taskBaseDir, taskId);
+ httpClient = injector.getInstance(Key.get(HttpClient.class, Global.class));
+ forkingTaskRunner = makeForkingTaskRunner(30_000);
+ }
+
+ @After
+ public void tearDown() throws IOException
+ {
+ watchFile.delete();
+ forkingTaskRunner.stop();
+ closer.close();
+ }
+
+ @Test(timeout = 600_000)
+ public void testForkingCanKill() throws IOException, InterruptedException, ExecutionException
+ {
+ ListenableFuture future = waitForTaskStart(600_000);
+ Assert.assertFalse(forkingTaskRunner.getRunningTasks().isEmpty());
+ forkingTaskRunner.shutdown(taskId);
+ waitForEmptyTaskList(1_000);
+ Assert.assertTrue(future.get().isFailure());
+ Assert.assertFalse(taskDir.exists());
+ }
+
+ @Test(timeout = 600_000)
+ public void testForking() throws IOException, InterruptedException, ExecutionException
+ {
+ final ListenableFuture future = waitForTaskStart(60_000);
+ Assert.assertTrue(watchFile.delete());
+ Assert.assertTrue(future.get().isSuccess());
+ Assert.assertFalse(taskDir.exists());
+ }
+
+ @Test(timeout = 600_000)
+ public void testKillingForkedJobNewRunner() throws IOException, InterruptedException, ExecutionException
+ {
+ forkingTaskRunner = makeForkingTaskRunner(1_000);
+ forkingTaskRunner.start();
+ ListenableFuture future = waitForTaskStart(60_000);
+ Assert.assertFalse(forkingTaskRunner.getRunningTasks().isEmpty());
+ forkingTaskRunner.shutdown(taskId);
+ waitForEmptyTaskList(1_000);
+ Assert.assertTrue(future.get().isFailure());
+ Assert.assertFalse(taskDir.exists());
+ }
+
+ @Test(timeout = 600_000)
+ public void testStartingNewRunner() throws IOException, InterruptedException, ExecutionException
+ {
+ waitForTaskStart(600_000);
+ Assert.assertTrue(taskDir.exists());
+ Assert.assertFalse(forkingTaskRunner.getRunningTasks().isEmpty());
+ forkingTaskRunner.stop();
+ Assert.assertTrue(taskDir.exists());
+ forkingTaskRunner = makeForkingTaskRunner(600_000);
+ forkingTaskRunner.start();
+ // Should pick up prior task
+ Assert.assertFalse(forkingTaskRunner.getRunningTasks().isEmpty());
+ ListenableFuture future = forkingTaskRunner.run(new BusyTask(taskId, null, 60_000L));
+ // Signal task to exit
+ Assert.assertTrue(watchFile.delete());
+ Assert.assertTrue(future.get().isSuccess());
+ // Wait for task to clean up itself
+ if (taskDir.exists()) {
+ try (WatchService watchService = taskDir.toPath().getFileSystem().newWatchService()) {
+ taskDir.toPath().getParent().register(watchService, StandardWatchEventKinds.ENTRY_DELETE);
+ while (taskDir.exists()) {
+ Assert.assertNotNull(watchService.poll(1, TimeUnit.MINUTES));
+ }
+ }
+ }
+ Assert.assertFalse(taskDir.exists());
+ }
+
+ @Test
+ public void testBadPort() throws IOException
+ {
+ final File attemptDir = new File(taskDir, "attempt_dir");
+ Assert.assertTrue(attemptDir.mkdirs());
+
+ final File portFile = new File(attemptDir, "task.port");
+ writeStringToFile("bad string", portFile);
+
+ final File taskFile = new File(attemptDir, "task.json");
+ writeStringToFile(
+ mapper.writeValueAsString(
+ new BusyTask(
+ taskId,
+ watchFile.getAbsolutePath(),
+ 100
+ )
+ ), taskFile
+ );
+
+ final File logFile = new File(attemptDir, "task.log");
+ Assert.assertTrue(logFile.createNewFile());
+
+ final File statusFile = new File(attemptDir, "status.json");
+ Assert.assertTrue(statusFile.createNewFile());
+
+ forkingTaskRunner.start();
+ waitForEmptyTaskList(1_000);
+ }
+
+ @Test
+ public void testBadTask() throws IOException
+ {
+ final File attemptDir = new File(taskDir, "attempt_dir");
+ Assert.assertTrue(attemptDir.mkdirs());
+
+ final File portFile = new File(attemptDir, "task.port");
+ writeStringToFile("12345", portFile);
+
+ final File taskFile = new File(attemptDir, "task.json");
+ writeStringToFile(
+ mapper.writeValueAsString(
+ new BusyTask(
+ taskId,
+ watchFile.getAbsolutePath(),
+ 100
+ )
+ ), taskFile
+ );
+
+ try (FileOutputStream fos = new FileOutputStream(taskFile)) {
+ fos.write(new byte[]{1, 2, 3, 4, 5, 6});
+ }
+
+ final File logFile = new File(attemptDir, "task.log");
+ Assert.assertTrue(logFile.createNewFile());
+
+ final File statusFile = new File(attemptDir, "status.json");
+ Assert.assertTrue(statusFile.createNewFile());
+
+ forkingTaskRunner.start();
+ waitForEmptyTaskList(1_000);
+ }
+
+
+ @Test
+ public void testTaskHadFinished() throws IOException
+ {
+ final File attemptDir = new File(taskDir, "attempt_dir");
+ Assert.assertTrue(attemptDir.mkdirs());
+
+ final File taskFile = new File(attemptDir, "task.json");
+ writeStringToFile(
+ mapper.writeValueAsString(
+ new BusyTask(
+ taskId,
+ watchFile.getAbsolutePath(),
+ 100
+ )
+ ), taskFile
+ );
+
+ final File logFile = new File(attemptDir, "task.log");
+ Assert.assertTrue(logFile.createNewFile());
+
+ final File statusFile = new File(attemptDir, "status.json");
+ writeStringToFile(mapper.writeValueAsString(TaskStatus.success(taskId)), statusFile);
+
+ Assert.assertEquals(0, pushTaskLogCalls.get());
+ forkingTaskRunner.start();
+ waitForEmptyTaskList(1_000);
+ Assert.assertEquals(1, pushTaskLogCalls.get());
+ }
+
+ private void waitForEmptyTaskList(long timeout)
+ {
+ long start = System.currentTimeMillis();
+ while(!forkingTaskRunner.getRunningTasks().isEmpty())
+ {
+ Assert.assertTrue(System.currentTimeMillis() - start < timeout);
+ }
+ }
+
+ private void writeStringToFile(String string, final File file) throws IOException
+ {
+ new ByteSink()
+ {
+ @Override
+ public OutputStream openStream() throws IOException
+ {
+ return new FileOutputStream(file);
+ }
+ }.write(
+ StringUtils.toUtf8(string)
+ );
+ Assert.assertTrue(file.exists());
+ Assert.assertTrue(file.length() > 0);
+ }
+
+ private ListenableFuture waitForTaskStart(long sleep) throws InterruptedException, IOException
+ {
+ if (!forkingTaskRunner.isStarted()) {
+ forkingTaskRunner.start();
+ }
+ final Path watchPath = watchFile.toPath().getParent();
+ Assert.assertFalse(watchFile.exists());
+ try (final WatchService watchService = watchPath.getFileSystem().newWatchService()) {
+ closer.register(watchService);
+ watchPath.register(watchService, StandardWatchEventKinds.ENTRY_CREATE);
+ final ListenableFuture future = forkingTaskRunner.run(
+ new BusyTask(
+ taskId,
+ watchFile.getAbsolutePath(),
+ sleep
+ )
+ );
+ while (!watchFile.exists()) {
+ watchService.take();
+ }
+ Assert.assertTrue(watchFile.exists());
+ return future;
+ }
+ }
+
+ @JsonTypeName("busyTask")
+ public static class BusyTask extends AbstractTask
+ {
+ private static final Logger log = new Logger(BusyTask.class);
+ private final String lockFile;
+ private final long sleep;
+
+ public BusyTask(
+ @JsonProperty("id") String id,
+ @JsonProperty("lockFile") String lockFile,
+ @JsonProperty("sleep") long sleep
+ )
+ {
+ super(id == null ? "testTask-" + UUID.randomUUID().toString() : id, "noDataSource");
+ this.lockFile = lockFile;
+ this.sleep = sleep;
+ }
+
+ @JsonProperty("lockFile")
+ public String getLockFile()
+ {
+ return lockFile;
+ }
+
+ @JsonProperty("sleep")
+ public long getSleep()
+ {
+ return sleep;
+ }
+
+ @Override
+ public String getType()
+ {
+ return "busyTask";
+ }
+
+ @Override
+ public boolean isReady(TaskActionClient taskActionClient) throws Exception
+ {
+ return true;
+ }
+
+ @Override
+ public TaskStatus run(TaskToolbox toolbox) throws Exception
+ {
+ log.info("Deleting file at [%s]", getLockFile());
+ File file = new File(getLockFile());
+ if (!file.createNewFile()) {
+ log.error("Error deleting file at [%s]", file);
+ }
+ final Path path = file.toPath();
+ while (file.exists()) {
+ try (WatchService service = path.getFileSystem().newWatchService()) {
+ path.getParent().register(service, StandardWatchEventKinds.ENTRY_DELETE);
+ if (file.exists()) {
+ WatchKey key = service.poll(sleep, TimeUnit.MILLISECONDS);
+ if (key == null) {
+ log.error("Ran out of time waiting for [%s]", path);
+ return TaskStatus.failure(getId());
+ }
+ log.info("Delete event found for [%s]", path);
+ }
+ }
+ }
+ return TaskStatus.success(getId());
+ }
+ }
+
+
+ public static class ForkingTaskRunnerTestModule implements DruidModule
+ {
+ @Override
+ public List extends Module> getJacksonModules()
+ {
+ final SimpleModule module = new SimpleModule("ForkingTaskRunnerTestModule");
+ module.registerSubtypes(BusyTask.class);
+ return ImmutableList.of(module);
+ }
+
+ @Override
+ public void configure(Binder binder)
+ {
+ // NOOP
+ }
+ }
+}
diff --git a/services/src/test/resources/META-INF/services/io.druid.initialization.DruidModule b/services/src/test/resources/META-INF/services/io.druid.initialization.DruidModule
new file mode 100644
index 000000000000..566c2aed4604
--- /dev/null
+++ b/services/src/test/resources/META-INF/services/io.druid.initialization.DruidModule
@@ -0,0 +1,20 @@
+#
+# Licensed to Metamarkets Group Inc. (Metamarkets) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. Metamarkets licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+io.druid.indexing.overlord.autoscaling.ForkingTaskRunnerTest$ForkingTaskRunnerTestModule
\ No newline at end of file