diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskLocation.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskLocation.java new file mode 100644 index 000000000000..2eed2dcf6d6e --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskLocation.java @@ -0,0 +1,85 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +public class TaskLocation +{ + private static final TaskLocation UNKNOWN = new TaskLocation(null, -1); + + private final String host; + private final int port; + + public static TaskLocation create(String host, int port) + { + return new TaskLocation(host, port); + } + + public static TaskLocation unknown() + { + return TaskLocation.UNKNOWN; + } + + @JsonCreator + public TaskLocation( + @JsonProperty("host") String host, + @JsonProperty("port") int port + ) + { + this.host = host; + this.port = port; + } + + @JsonProperty + public String getHost() + { + return host; + } + + @JsonProperty + public int getPort() + { + return port; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TaskLocation that = (TaskLocation) o; + return port == that.port && + Objects.equals(host, that.host); + } + + @Override + public int hashCode() + { + return Objects.hash(host, port); + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java index 32d421289e6d..6ff92235c415 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java @@ -53,6 +53,7 @@ import com.metamx.emitter.EmittingLogger; import io.druid.concurrent.Execs; import io.druid.guice.annotations.Self; +import io.druid.indexing.common.TaskLocation; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.task.Task; @@ -81,6 +82,8 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; /** @@ -99,6 +102,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer private final ListeningExecutorService exec; private final ObjectMapper jsonMapper; private final PortFinder portFinder; + private final CopyOnWriteArrayList> listeners = new CopyOnWriteArrayList<>(); // Writes must be synchronized. This is only a ConcurrentMap so "informational" reads can occur without waiting. private final Map tasks = Maps.newConcurrentMap(); @@ -171,6 +175,19 @@ public List>> restore() return retVal; } + public void registerListener(TaskRunnerListener listener, Executor executor) + { + final Pair listenerPair = Pair.of(listener, executor); + + synchronized (tasks) { + for (ForkingTaskRunnerWorkItem item : tasks.values()) { + TaskRunnerUtils.notifyLocationChanged(ImmutableList.of(listenerPair), item.getTaskId(), item.getLocation()); + } + + listeners.add(listenerPair); + } + } + @Override public ListenableFuture run(final Task task) { @@ -191,6 +208,7 @@ public TaskStatus call() final File attemptDir = new File(taskDir, attemptUUID); final ProcessHolder processHolder; + final String childHost = node.getHost(); final int childPort; final int childChatHandlerPort; @@ -203,6 +221,8 @@ public TaskStatus call() childChatHandlerPort = -1; } + final TaskLocation taskLocation = TaskLocation.create(childHost, childPort); + try { final Closer closer = Closer.create(); try { @@ -235,7 +255,6 @@ public TaskStatus call() } final List command = Lists.newArrayList(); - final String childHost = node.getHost(); final String taskClasspath; if (task.getClasspathPrefix() != null && !task.getClasspathPrefix().isEmpty()) { taskClasspath = Joiner.on(File.pathSeparator).join( @@ -250,7 +269,10 @@ public TaskStatus call() command.add("-cp"); command.add(taskClasspath); - Iterables.addAll(command, new QuotableWhiteSpaceSplitter(config.getJavaOpts(), jsonMapper)); + Iterables.addAll( + command, + new QuotableWhiteSpaceSplitter(config.getJavaOpts(), jsonMapper) + ); // Override task specific javaOpts Object taskJavaOpts = task.getContextValue( @@ -333,8 +355,8 @@ public TaskStatus call() * Users are highly suggested to be set in druid.indexer.runner.javaOpts * See io.druid.concurrent.TaskThreadPriority#getThreadPriorityFromTaskPriority(int) * for more information - command.add("-XX:+UseThreadPriorities"); - command.add("-XX:ThreadPriorityPolicy=42"); + command.add("-XX:+UseThreadPriorities"); + command.add("-XX:ThreadPriorityPolicy=42"); */ if (config.isSeparateIngestionEndpoint()) { @@ -370,13 +392,16 @@ public TaskStatus call() taskWorkItem.processHolder = new ProcessHolder( new ProcessBuilder(ImmutableList.copyOf(command)).redirectErrorStream(true).start(), logFile, - childPort + taskLocation.getHost(), + taskLocation.getPort() ); processHolder = taskWorkItem.processHolder; processHolder.registerWithCloser(closer); } + TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(), taskLocation); + log.info("Logging task %s output to: %s", task.getId(), logFile); boolean runFailed = true; @@ -673,18 +698,30 @@ public Task getTask() { return task; } + + @Override + public TaskLocation getLocation() + { + if (processHolder == null) { + return TaskLocation.unknown(); + } else { + return TaskLocation.create(processHolder.host, processHolder.port); + } + } } private static class ProcessHolder { private final Process process; private final File logFile; + private final String host; private final int port; - private ProcessHolder(Process process, File logFile, int port) + private ProcessHolder(Process process, File logFile, String host, int port) { this.process = process; this.logFile = logFile; + this.host = host; this.port = port; } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java index 1a290c8bb2c5..9dcdbe2c56ef 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java @@ -56,6 +56,7 @@ import com.metamx.http.client.response.StatusResponseHolder; import io.druid.curator.CuratorUtils; import io.druid.curator.cache.PathChildrenCacheFactory; +import io.druid.indexing.common.TaskLocation; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.task.Task; import io.druid.indexing.overlord.autoscaling.ResourceManagementStrategy; @@ -91,7 +92,9 @@ import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -145,6 +148,8 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer // Workers that have been marked as lazy. these workers are not running any tasks and can be terminated safely by the scaling policy. private final ConcurrentMap lazyWorkers = new ConcurrentHashMap<>(); + // task runner listeners + private final CopyOnWriteArrayList> listeners = new CopyOnWriteArrayList<>(); private final Object statusLock = new Object(); @@ -327,6 +332,24 @@ public List>> restore() return ImmutableList.of(); } + @Override + public void registerListener(TaskRunnerListener listener, Executor executor) + { + final Pair listenerPair = Pair.of(listener, executor); + + synchronized (statusLock) { + for (Map.Entry entry : runningTasks.entrySet()) { + TaskRunnerUtils.notifyLocationChanged( + ImmutableList.of(listenerPair), + entry.getKey(), + entry.getValue().getLocation() + ); + } + + listeners.add(listenerPair); + } + } + @Override public Collection getWorkers() { @@ -517,7 +540,7 @@ private URL makeWorkerURL(Worker worker, String path) private RemoteTaskRunnerWorkItem addPendingTask(final Task task) { log.info("Added pending task %s", task.getId()); - final RemoteTaskRunnerWorkItem taskRunnerWorkItem = new RemoteTaskRunnerWorkItem(task.getId(), null); + final RemoteTaskRunnerWorkItem taskRunnerWorkItem = new RemoteTaskRunnerWorkItem(task.getId(), null, null); pendingTaskPayloads.put(task.getId(), task); pendingTasks.put(task.getId(), taskRunnerWorkItem); runPendingTasks(); @@ -706,7 +729,7 @@ private boolean announceTask( return false; } - RemoteTaskRunnerWorkItem newWorkItem = workItem.withWorker(theZkWorker.getWorker()); + RemoteTaskRunnerWorkItem newWorkItem = workItem.withWorker(theZkWorker.getWorker(), null); runningTasks.put(task.getId(), newWorkItem); log.info("Task %s switched from pending to running (on [%s])", task.getId(), newWorkItem.getWorker().getHost()); @@ -783,15 +806,16 @@ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) th case CHILD_ADDED: case CHILD_UPDATED: taskId = ZKPaths.getNodeFromPath(event.getData().getPath()); - final TaskStatus taskStatus = jsonMapper.readValue( - event.getData().getData(), TaskStatus.class + final TaskAnnouncement announcement = jsonMapper.readValue( + event.getData().getData(), TaskAnnouncement.class ); log.info( - "Worker[%s] wrote %s status for task: %s", + "Worker[%s] wrote %s status for task [%s] on [%s]", zkWorker.getWorker().getHost(), - taskStatus.getStatusCode(), - taskId + announcement.getTaskStatus().getStatusCode(), + taskId, + announcement.getTaskLocation() ); // Synchronizing state with ZK @@ -803,7 +827,8 @@ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) th } else { final RemoteTaskRunnerWorkItem newTaskRunnerWorkItem = new RemoteTaskRunnerWorkItem( taskId, - zkWorker.getWorker() + zkWorker.getWorker(), + TaskLocation.unknown() ); final RemoteTaskRunnerWorkItem existingItem = runningTasks.putIfAbsent( taskId, @@ -821,8 +846,13 @@ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) th } } - if (taskStatus.isComplete()) { - taskComplete(taskRunnerWorkItem, zkWorker, taskStatus); + if (!announcement.getTaskLocation().equals(taskRunnerWorkItem.getLocation())) { + taskRunnerWorkItem.setLocation(announcement.getTaskLocation()); + TaskRunnerUtils.notifyLocationChanged(listeners, taskId, announcement.getTaskLocation()); + } + + if (announcement.getTaskStatus().isComplete()) { + taskComplete(taskRunnerWorkItem, zkWorker, announcement.getTaskStatus()); runPendingTasks(); } break; diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerWorkItem.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerWorkItem.java index 502006edd383..258f256159d8 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerWorkItem.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerWorkItem.java @@ -20,6 +20,7 @@ package io.druid.indexing.overlord; import com.google.common.util.concurrent.SettableFuture; +import io.druid.indexing.common.TaskLocation; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.worker.Worker; import org.joda.time.DateTime; @@ -30,34 +31,39 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem { private final SettableFuture result; private final Worker worker; + private TaskLocation location; public RemoteTaskRunnerWorkItem( String taskId, - Worker worker + Worker worker, + TaskLocation location ) { - this(taskId, SettableFuture.create(), worker); + this(taskId, SettableFuture.create(), worker, location); } public RemoteTaskRunnerWorkItem( String taskId, DateTime createdTime, DateTime queueInsertionTime, - Worker worker + Worker worker, + TaskLocation location ) { - this(taskId, SettableFuture.create(), createdTime, queueInsertionTime, worker); + this(taskId, SettableFuture.create(), createdTime, queueInsertionTime, worker, location); } private RemoteTaskRunnerWorkItem( String taskId, SettableFuture result, - Worker worker + Worker worker, + TaskLocation location ) { super(taskId, result); this.result = result; this.worker = worker; + this.location = location == null ? TaskLocation.unknown() : location; } private RemoteTaskRunnerWorkItem( @@ -65,12 +71,25 @@ private RemoteTaskRunnerWorkItem( SettableFuture result, DateTime createdTime, DateTime queueInsertionTime, - Worker worker + Worker worker, + TaskLocation location ) { super(taskId, result, createdTime, queueInsertionTime); this.result = result; this.worker = worker; + this.location = location == null ? TaskLocation.unknown() : location; + } + + public void setLocation(TaskLocation location) + { + this.location = location; + } + + @Override + public TaskLocation getLocation() + { + return location; } public Worker getWorker() @@ -83,14 +102,20 @@ public void setResult(TaskStatus status) result.set(status); } - @Override public RemoteTaskRunnerWorkItem withQueueInsertionTime(DateTime time) { - return new RemoteTaskRunnerWorkItem(getTaskId(), result, getCreatedTime(), time, worker); + return new RemoteTaskRunnerWorkItem(getTaskId(), result, getCreatedTime(), time, worker, location); } - public RemoteTaskRunnerWorkItem withWorker(Worker theWorker) + public RemoteTaskRunnerWorkItem withWorker(Worker theWorker, TaskLocation location) { - return new RemoteTaskRunnerWorkItem(getTaskId(), result, getCreatedTime(), getQueueInsertionTime(), theWorker); + return new RemoteTaskRunnerWorkItem( + getTaskId(), + result, + getCreatedTime(), + getQueueInsertionTime(), + theWorker, + location + ); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunner.java index 39f3629cddba..4de22ba1962d 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunner.java @@ -28,6 +28,7 @@ import java.util.Collection; import java.util.List; +import java.util.concurrent.Executor; /** * Interface for handing off tasks. Managed by a {@link io.druid.indexing.overlord.TaskQueue}. @@ -39,7 +40,21 @@ public interface TaskRunner * Some task runners can restart previously-running tasks after being bounced. This method does that, and returns * the list of tasks (and status futures). */ - public List>> restore(); + List>> restore(); + + /** + * Register a listener with this task runner. On registration, the listener will get events corresponding to the + * current state of known tasks. + * + * Listener notifications are submitted to the executor in the order they occur, but it is up to the executor + * to decide when to actually run the notifications. If your listeners will not block, feel free to use a + * same-thread executor. Listeners that may block should use a separate executor, generally a single-threaded + * one with a fifo queue so the order of notifications is retained. + * + * @param listener the listener + * @param executor executor to run callbacks in + */ + void registerListener(TaskRunnerListener listener, Executor executor); /** * Run a task. The returned status should be some kind of completed status. diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunnerListener.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunnerListener.java new file mode 100644 index 000000000000..0c6a0de1fb1a --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunnerListener.java @@ -0,0 +1,36 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.overlord; + +import io.druid.indexing.common.TaskLocation; + +import java.util.concurrent.Executor; + +/** + * Listener to be registered with {@link TaskRunner#registerListener(TaskRunnerListener, Executor)}. + */ +public interface TaskRunnerListener +{ + /** + * Called when the location of a task has changed. The task may not actually be done starting up when + * this notification arrives, so it may not be listening at this location yet. + */ + void locationChanged(String taskId, TaskLocation newLocation); +} diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunnerUtils.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunnerUtils.java new file mode 100644 index 000000000000..6fdcd881319d --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunnerUtils.java @@ -0,0 +1,61 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.overlord; + +import com.metamx.common.Pair; +import com.metamx.emitter.EmittingLogger; +import io.druid.indexing.common.TaskLocation; + +import java.util.concurrent.Executor; + +public class TaskRunnerUtils +{ + private static final EmittingLogger log = new EmittingLogger(TaskRunnerUtils.class); + + public static void notifyLocationChanged( + final Iterable> listeners, + final String taskId, + final TaskLocation location + ) + { + log.info("Task [%s] location changed to [%s].", taskId, location); + for (final Pair listener : listeners) { + try { + listener.rhs.execute( + new Runnable() + { + @Override + public void run() + { + listener.lhs.locationChanged(taskId, location); + } + } + ); + } + catch (Exception e) { + log.makeAlert(e, "Unable to notify task listener") + .addData("taskId", taskId) + .addData("taskLocation", location) + .addData("listener", listener.toString()) + .emit(); + } + } + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunnerWorkItem.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunnerWorkItem.java index 2f59bf67b30a..b6cd26044576 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunnerWorkItem.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunnerWorkItem.java @@ -21,16 +21,15 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.ComparisonChain; import com.google.common.util.concurrent.ListenableFuture; +import io.druid.indexing.common.TaskLocation; import io.druid.indexing.common.TaskStatus; import org.joda.time.DateTime; -import org.joda.time.DateTimeComparator; /** * A holder for a task and different components associated with the task */ -public class TaskRunnerWorkItem implements Comparable +public abstract class TaskRunnerWorkItem { private final String taskId; private final ListenableFuture result; @@ -82,19 +81,7 @@ public DateTime getQueueInsertionTime() return queueInsertionTime; } - public TaskRunnerWorkItem withQueueInsertionTime(DateTime time) - { - return new TaskRunnerWorkItem(taskId, result, createdTime, time); - } - - @Override - public int compareTo(TaskRunnerWorkItem taskRunnerWorkItem) - { - return ComparisonChain.start() - .compare(createdTime, taskRunnerWorkItem.getCreatedTime(), DateTimeComparator.getInstance()) - .compare(taskId, taskRunnerWorkItem.getTaskId()) - .result(); - } + public abstract TaskLocation getLocation(); @Override public String toString() @@ -104,6 +91,7 @@ public String toString() ", result=" + result + ", createdTime=" + createdTime + ", queueInsertionTime=" + queueInsertionTime + + ", location=" + getLocation() + '}'; } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java index b7a78c2cb9d9..f26899bce04d 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java @@ -37,6 +37,8 @@ import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.concurrent.Execs; import io.druid.concurrent.TaskThreadPriority; +import io.druid.guice.annotations.Self; +import io.druid.indexing.common.TaskLocation; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.TaskToolboxFactory; @@ -48,10 +50,12 @@ import io.druid.query.QueryRunner; import io.druid.query.QuerySegmentWalker; import io.druid.query.SegmentDescriptor; +import io.druid.server.DruidNode; import org.joda.time.DateTime; import org.joda.time.Interval; import java.util.Collection; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Set; @@ -59,6 +63,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; /** @@ -71,19 +77,25 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker private final TaskToolboxFactory toolboxFactory; private final TaskConfig taskConfig; private final ConcurrentMap exec = new ConcurrentHashMap<>(); - private final Set runningItems = new ConcurrentSkipListSet<>(); + private final Set runningItems = new ConcurrentSkipListSet<>( + ThreadPoolTaskRunnerWorkItem.COMPARATOR + ); + private final CopyOnWriteArrayList> listeners = new CopyOnWriteArrayList<>(); private final ServiceEmitter emitter; + private final TaskLocation location; @Inject public ThreadPoolTaskRunner( TaskToolboxFactory toolboxFactory, TaskConfig taskConfig, - ServiceEmitter emitter + ServiceEmitter emitter, + @Self DruidNode node ) { this.toolboxFactory = Preconditions.checkNotNull(toolboxFactory, "toolboxFactory"); this.taskConfig = taskConfig; this.emitter = Preconditions.checkNotNull(emitter, "emitter"); + this.location = TaskLocation.create(node.getHost(), node.getPort()); } @Override @@ -92,6 +104,20 @@ public List>> restore() return ImmutableList.of(); } + @Override + public void registerListener(TaskRunnerListener listener, Executor executor) + { + final Pair listenerPair = Pair.of(listener, executor); + + // Location never changes for an existing task, so it's ok to add the listener first and then issue bootstrap + // callbacks without any special synchronization. + + listeners.add(listenerPair); + for (ThreadPoolTaskRunnerWorkItem item : runningItems) { + TaskRunnerUtils.notifyLocationChanged(ImmutableList.of(listenerPair), item.getTaskId(), item.getLocation()); + } + } + private static ListeningExecutorService buildExecutorService(int priority) { return MoreExecutors.listeningDecorator( @@ -182,10 +208,10 @@ public ListenableFuture run(final Task task) final TaskToolbox toolbox = toolboxFactory.build(task); final Object taskPriorityObj = task.getContextValue(TaskThreadPriority.CONTEXT_KEY); int taskPriority = 0; - if(taskPriorityObj != null){ - if(taskPriorityObj instanceof Number) { + if (taskPriorityObj != null) { + if (taskPriorityObj instanceof Number) { taskPriority = ((Number) taskPriorityObj).intValue(); - } else if(taskPriorityObj instanceof String) { + } else if (taskPriorityObj instanceof String) { try { taskPriority = Integer.parseInt(taskPriorityObj.toString()); } @@ -203,8 +229,16 @@ public ListenableFuture run(final Task task) } } final ListenableFuture statusFuture = exec.get(taskPriority) - .submit(new ThreadPoolTaskRunnerCallable(task, toolbox)); - final ThreadPoolTaskRunnerWorkItem taskRunnerWorkItem = new ThreadPoolTaskRunnerWorkItem(task, statusFuture); + .submit(new ThreadPoolTaskRunnerCallable( + task, + location, + toolbox + )); + final ThreadPoolTaskRunnerWorkItem taskRunnerWorkItem = new ThreadPoolTaskRunnerWorkItem( + task, + location, + statusFuture + ); runningItems.add(taskRunnerWorkItem); Futures.addCallback( statusFuture, new FutureCallback() @@ -305,31 +339,54 @@ private QueryRunner getQueryRunnerImpl(Query query) private static class ThreadPoolTaskRunnerWorkItem extends TaskRunnerWorkItem { + private static final Comparator COMPARATOR = new Comparator() + { + @Override + public int compare( + ThreadPoolTaskRunnerWorkItem lhs, + ThreadPoolTaskRunnerWorkItem rhs + ) + { + return lhs.getTaskId().compareTo(rhs.getTaskId()); + } + }; + private final Task task; + private final TaskLocation location; private ThreadPoolTaskRunnerWorkItem( Task task, + TaskLocation location, ListenableFuture result ) { super(task.getId(), result); this.task = task; + this.location = location; } public Task getTask() { return task; } + + @Override + public TaskLocation getLocation() + { + return location; + } } - private static class ThreadPoolTaskRunnerCallable implements Callable + private class ThreadPoolTaskRunnerCallable implements Callable { private final Task task; + private final TaskLocation location; private final TaskToolbox toolbox; - public ThreadPoolTaskRunnerCallable(Task task, TaskToolbox toolbox) + public ThreadPoolTaskRunnerCallable(Task task, TaskLocation location, TaskToolbox toolbox) { this.task = task; + this.location = location; this.toolbox = toolbox; } @@ -342,6 +399,11 @@ public TaskStatus call() try { log.info("Running task: %s", task.getId()); + TaskRunnerUtils.notifyLocationChanged( + listeners, + task.getId(), + location + ); status = task.run(toolbox); } catch (InterruptedException e) { diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java index bc42e30785af..59c2ae307caa 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java @@ -31,10 +31,10 @@ import com.google.common.util.concurrent.SettableFuture; import com.google.inject.Inject; import com.metamx.common.logger.Logger; - import io.druid.audit.AuditInfo; import io.druid.audit.AuditManager; import io.druid.common.config.JacksonConfigManager; +import io.druid.indexing.common.TaskLocation; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionHolder; @@ -50,7 +50,6 @@ import io.druid.metadata.EntryExistsException; import io.druid.tasklogs.TaskLogStreamer; import io.druid.timeline.DataSegment; - import org.joda.time.DateTime; import org.joda.time.Interval; @@ -67,8 +66,6 @@ import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; -import javax.ws.rs.core.Response.Status; - import java.io.IOException; import java.util.Collection; import java.util.List; @@ -330,6 +327,13 @@ public String apply(final TaskRunnerWorkItem workItem) new DateTime(0), new DateTime(0) ) + { + @Override + public TaskLocation getLocation() + { + return TaskLocation.unknown(); + } + } ); } } @@ -390,7 +394,8 @@ public TaskResponseObject apply(TaskStatus taskStatus) taskStatus.getId(), new DateTime(0), new DateTime(0), - Optional.of(taskStatus) + Optional.of(taskStatus), + TaskLocation.unknown() ); } } @@ -489,7 +494,8 @@ public TaskResponseObject apply(TaskRunnerWorkItem workItem) workItem.getTaskId(), workItem.getCreatedTime(), workItem.getQueueInsertionTime(), - Optional.absent() + Optional.absent(), + workItem.getLocation() ); } } @@ -522,44 +528,27 @@ private Response asLeaderWith(Optional x, Function f) } } - private static class TaskResponseObject + static class TaskResponseObject { private final String id; private final DateTime createdTime; private final DateTime queueInsertionTime; private final Optional status; + private final TaskLocation location; private TaskResponseObject( String id, DateTime createdTime, DateTime queueInsertionTime, - Optional status + Optional status, + TaskLocation location ) { this.id = id; this.createdTime = createdTime; this.queueInsertionTime = queueInsertionTime; this.status = status; - } - - public String getId() - { - return id; - } - - public DateTime getCreatedTime() - { - return createdTime; - } - - public DateTime getQueueInsertionTime() - { - return queueInsertionTime; - } - - public Optional getStatus() - { - return status; + this.location = location; } @JsonValue @@ -579,6 +568,9 @@ public Map toJson() data.put("duration", status.get().getDuration()); } } + if (location != null) { + data.put("location", location); + } return data; } } diff --git a/indexing-service/src/main/java/io/druid/indexing/worker/TaskAnnouncement.java b/indexing-service/src/main/java/io/druid/indexing/worker/TaskAnnouncement.java index 42ec0307329e..a59e79228051 100644 --- a/indexing-service/src/main/java/io/druid/indexing/worker/TaskAnnouncement.java +++ b/indexing-service/src/main/java/io/druid/indexing/worker/TaskAnnouncement.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import io.druid.indexing.common.TaskLocation; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.TaskResource; @@ -33,16 +34,17 @@ public class TaskAnnouncement { private final TaskStatus taskStatus; private final TaskResource taskResource; + private final TaskLocation taskLocation; - public static TaskAnnouncement create(Task task, TaskStatus status) + public static TaskAnnouncement create(Task task, TaskStatus status, TaskLocation location) { - return create(task.getId(), task.getTaskResource(), status); + return create(task.getId(), task.getTaskResource(), status, location); } - public static TaskAnnouncement create(String taskId, TaskResource resource, TaskStatus status) + public static TaskAnnouncement create(String taskId, TaskResource resource, TaskStatus status, TaskLocation location) { Preconditions.checkArgument(status.getId().equals(taskId), "task id == status id"); - return new TaskAnnouncement(null, null, status, resource); + return new TaskAnnouncement(null, null, status, resource, location); } @JsonCreator @@ -50,7 +52,8 @@ private TaskAnnouncement( @JsonProperty("id") String taskId, @JsonProperty("status") TaskStatus.Status status, @JsonProperty("taskStatus") TaskStatus taskStatus, - @JsonProperty("taskResource") TaskResource taskResource + @JsonProperty("taskResource") TaskResource taskResource, + @JsonProperty("taskLocation") TaskLocation taskLocation ) { if (taskStatus != null) { @@ -60,6 +63,7 @@ private TaskAnnouncement( this.taskStatus = TaskStatus.fromCode(taskId, status); } this.taskResource = taskResource == null ? new TaskResource(this.taskStatus.getId(), 1) : taskResource; + this.taskLocation = taskLocation == null ? TaskLocation.unknown() : taskLocation; } // Can be removed when backwards compat is no longer needed @@ -89,4 +93,10 @@ public TaskResource getTaskResource() { return taskResource; } + + @JsonProperty("taskLocation") + public TaskLocation getTaskLocation() + { + return taskLocation; + } } diff --git a/indexing-service/src/main/java/io/druid/indexing/worker/WorkerCuratorCoordinator.java b/indexing-service/src/main/java/io/druid/indexing/worker/WorkerCuratorCoordinator.java index 59d68220bfd7..50a130da8723 100644 --- a/indexing-service/src/main/java/io/druid/indexing/worker/WorkerCuratorCoordinator.java +++ b/indexing-service/src/main/java/io/druid/indexing/worker/WorkerCuratorCoordinator.java @@ -167,7 +167,7 @@ public Worker getWorker() return worker; } - public void unannounceTask(String taskId) + public void removeTaskRunZnode(String taskId) { try { curatorFramework.delete().guaranteed().forPath(getTaskPathForId(taskId)); @@ -177,7 +177,7 @@ public void unannounceTask(String taskId) } } - public void updateAnnouncement(TaskAnnouncement announcement) + public void updateTaskStatusAnnouncement(TaskAnnouncement announcement) { synchronized (lock) { if (!started) { diff --git a/indexing-service/src/main/java/io/druid/indexing/worker/WorkerTaskMonitor.java b/indexing-service/src/main/java/io/druid/indexing/worker/WorkerTaskMonitor.java index 3ecf2a7b521e..56c3d73bc9a4 100644 --- a/indexing-service/src/main/java/io/druid/indexing/worker/WorkerTaskMonitor.java +++ b/indexing-service/src/main/java/io/druid/indexing/worker/WorkerTaskMonitor.java @@ -20,35 +20,39 @@ package io.druid.indexing.worker; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.api.client.util.Sets; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; import com.metamx.common.Pair; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.emitter.EmittingLogger; import io.druid.concurrent.Execs; +import io.druid.indexing.common.TaskLocation; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.task.Task; import io.druid.indexing.overlord.TaskRunner; -import io.druid.indexing.worker.config.WorkerConfig; +import io.druid.indexing.overlord.TaskRunnerListener; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.listen.Listenable; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import java.util.List; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArrayList; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingDeque; /** * The monitor watches ZK at a specified path for new tasks to appear. Upon starting the monitor, a listener will be * created that waits for new tasks. Tasks are executed as soon as they are seen. - *

- * The monitor implements {@link io.druid.query.QuerySegmentWalker} so tasks can offer up queryable data. This is useful for - * realtime index tasks. */ public class WorkerTaskMonitor { @@ -60,15 +64,19 @@ public class WorkerTaskMonitor private final WorkerCuratorCoordinator workerCuratorCoordinator; private final TaskRunner taskRunner; private final ExecutorService exec; - private final List running = new CopyOnWriteArrayList(); + + private final BlockingQueue notices = new LinkedBlockingDeque<>(); + private final Map running = new ConcurrentHashMap<>(); + + private final Object lifecycleLock = new Object(); + private volatile boolean started = false; @Inject public WorkerTaskMonitor( ObjectMapper jsonMapper, CuratorFramework cf, WorkerCuratorCoordinator workerCuratorCoordinator, - TaskRunner taskRunner, - WorkerConfig workerConfig + TaskRunner taskRunner ) { this.jsonMapper = jsonMapper; @@ -78,164 +86,353 @@ public WorkerTaskMonitor( this.cf = cf; this.workerCuratorCoordinator = workerCuratorCoordinator; this.taskRunner = taskRunner; - - this.exec = Execs.multiThreaded(workerConfig.getCapacity(), "WorkerTaskMonitor-%d"); + this.exec = Execs.singleThreaded("WorkerTaskMonitor"); } /** * Register a monitor for new tasks. When new tasks appear, the worker node announces a status to indicate it has - * started the task. When the task is complete, the worker node updates the status. It is up to the coordinator to - * determine how many tasks are sent to each worker node and cleanup tasks and statuses in ZK accordingly. + * started the task. When the task is complete, the worker node updates the status. */ @LifecycleStart public void start() { - try { - // restore restorable tasks - final List>> restored = taskRunner.restore(); - for (Pair> pair : restored) { - submitTaskRunnable(pair.lhs, pair.rhs); - } - - // cleanup any old running task announcements which are invalid after restart - for (TaskAnnouncement announcement : workerCuratorCoordinator.getAnnouncements()) { - if (!isTaskRunning(announcement.getTaskStatus().getId()) && announcement.getTaskStatus().isRunnable()) { - workerCuratorCoordinator.updateAnnouncement( - TaskAnnouncement.create( - announcement.getTaskStatus().getId(), - announcement.getTaskResource(), - TaskStatus.failure(announcement.getTaskStatus().getId()) - ) - ); - } - } + synchronized (lifecycleLock) { + Preconditions.checkState(!started, "already started"); + Preconditions.checkState(!exec.isShutdown(), "already stopped"); + started = true; - pathChildrenCache.getListenable().addListener( - new PathChildrenCacheListener() - { - @Override - public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) - throws Exception + try { + restoreRestorableTasks(); + cleanupStaleAnnouncements(); + registerRunListener(); + registerLocationListener(); + pathChildrenCache.start(); + exec.submit( + new Runnable() { - if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) { - final Task task = jsonMapper.readValue( - cf.getData().forPath(pathChildrenCacheEvent.getData().getPath()), - Task.class - ); - - submitTaskRunnable(task, null); + @Override + public void run() + { + mainLoop(); } } - } - ); - pathChildrenCache.start(); - } - catch (Exception e) { - log.makeAlert(e, "Exception starting WorkerTaskMonitor") - .addData("exception", e.toString()) - .emit(); + ); + + log.info("Started WorkerTaskMonitor."); + started = true; + } + catch (Exception e) { + log.makeAlert(e, "Exception starting WorkerTaskMonitor") + .emit(); + throw Throwables.propagate(e); + } } } - private void submitTaskRunnable(final Task task, final ListenableFuture taskStatusAlreadySubmitted) + private void mainLoop() { - if (isTaskRunning(task.getId())) { - log.warn( - "I can't build it. There's something in the way. Got task %s that I am already running...", - task.getId() - ); - workerCuratorCoordinator.unannounceTask(task.getId()); - return; + try { + while (!Thread.currentThread().isInterrupted()) { + final Notice notice = notices.take(); + + try { + notice.handle(); + } + catch (Exception e) { + log.makeAlert(e, "Failed to handle notice") + .addData("noticeClass", notice.getClass().getSimpleName()) + .addData("noticeTaskId", notice.getTaskId()) + .emit(); + } + } + } + catch (InterruptedException e) { + log.info("WorkerTaskMonitor interrupted, exiting."); } + } - log.info("Submitting runnable for task[%s]", task.getId()); + private void restoreRestorableTasks() + { + final List>> restored = taskRunner.restore(); + for (Pair> pair : restored) { + addRunningTask(pair.lhs, pair.rhs); + } + } - running.add(task); + private void cleanupStaleAnnouncements() + { + // cleanup any old running task announcements which are invalid after restart + for (TaskAnnouncement announcement : workerCuratorCoordinator.getAnnouncements()) { + if (!running.containsKey(announcement.getTaskStatus().getId()) && announcement.getTaskStatus().isRunnable()) { + log.info("Cleaning up stale announcement for task [%s].", announcement.getTaskStatus().getId()); + workerCuratorCoordinator.updateTaskStatusAnnouncement( + TaskAnnouncement.create( + announcement.getTaskStatus().getId(), + announcement.getTaskResource(), + TaskStatus.failure(announcement.getTaskStatus().getId()), + TaskLocation.unknown() + ) + ); + } + } + } - exec.submit( - new Runnable() + private void registerRunListener() + { + pathChildrenCache.getListenable().addListener( + new PathChildrenCacheListener() { @Override - public void run() + public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) + throws Exception { - final long startTime = System.currentTimeMillis(); - - TaskStatus taskStatus; - - try { - workerCuratorCoordinator.updateAnnouncement( - TaskAnnouncement.create( - task, - TaskStatus.running(task.getId()) - ) + if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) { + final Task task = jsonMapper.readValue( + cf.getData().forPath(pathChildrenCacheEvent.getData().getPath()), + Task.class ); - if (taskStatusAlreadySubmitted != null) { - log.info("Affirmative. Connecting to already-running task [%s]", task.getId()); - taskStatus = taskStatusAlreadySubmitted.get(); - } else { - log.info("Affirmative. Running task [%s]", task.getId()); - workerCuratorCoordinator.unannounceTask(task.getId()); - taskStatus = taskRunner.run(task).get(); - } - } - catch (InterruptedException e) { - log.debug(e, "Interrupted while running task[%s], exiting.", task.getId()); - return; - } - catch (Exception e) { - log.makeAlert(e, "I can't build there. Failed to run task") - .addData("task", task.getId()) - .emit(); - taskStatus = TaskStatus.failure(task.getId()); - } - finally { - running.remove(task); + notices.add(new RunNotice(task)); } + } + } + ); + } - taskStatus = taskStatus.withDuration(System.currentTimeMillis() - startTime); + private void registerLocationListener() + { + taskRunner.registerListener( + new TaskRunnerListener() + { + @Override + public void locationChanged(final String taskId, final TaskLocation newLocation) + { + notices.add(new LocationNotice(taskId, newLocation)); + } + }, + MoreExecutors.sameThreadExecutor() + ); + } - try { - workerCuratorCoordinator.updateAnnouncement(TaskAnnouncement.create(task, taskStatus)); - log.info( - "Job's finished. Completed [%s] with status [%s]", - task.getId(), - taskStatus.getStatusCode() - ); - } - catch (Exception e) { - log.makeAlert(e, "Failed to update task status") - .addData("task", task.getId()) - .emit(); - } + private void addRunningTask(final Task task, final ListenableFuture future) + { + running.put(task.getId(), new TaskDetails(task)); + Futures.addCallback( + future, + new FutureCallback() + { + @Override + public void onSuccess(TaskStatus result) + { + notices.add(new StatusNotice(task, result)); + } + + @Override + public void onFailure(Throwable t) + { + notices.add(new StatusNotice(task, TaskStatus.failure(task.getId()))); } } ); } - private boolean isTaskRunning(final String taskId) + @LifecycleStop + public void stop() { - for (final Task runningTask : running) { - if (runningTask.getId().equals(taskId)) { - return true; + synchronized (lifecycleLock) { + Preconditions.checkState(started, "not started"); + + try { + exec.shutdownNow(); + pathChildrenCache.close(); + taskRunner.stop(); + + started = false; + log.info("Stopped WorkerTaskMonitor."); } + catch (Exception e) { + log.makeAlert(e, "Exception stopping WorkerTaskMonitor") + .emit(); + } + } + } + + private static class TaskDetails + { + private final Task task; + private final long startTime; + private TaskStatus status; + private TaskLocation location; + + public TaskDetails(Task task) + { + this.task = task; + this.startTime = System.currentTimeMillis(); + this.status = TaskStatus.running(task.getId()); + this.location = TaskLocation.unknown(); } + } + + private interface Notice + { + String getTaskId(); - return false; + void handle(); } - @LifecycleStop - public void stop() + private class RunNotice implements Notice { - try { - pathChildrenCache.close(); - exec.shutdownNow(); - taskRunner.stop(); - } - catch (Exception e) { - log.makeAlert(e, "Exception stopping WorkerTaskMonitor") - .addData("exception", e.toString()) - .emit(); + private final Task task; + + public RunNotice(Task task) + { + this.task = task; + } + + @Override + public String getTaskId() + { + return task.getId(); + } + + @Override + public void handle() + { + if (running.containsKey(task.getId())) { + log.warn( + "Got run notice for task [%s] that I am already running...", + task.getId() + ); + workerCuratorCoordinator.removeTaskRunZnode(task.getId()); + return; + } + + log.info("Submitting runnable for task[%s]", task.getId()); + + workerCuratorCoordinator.updateTaskStatusAnnouncement( + TaskAnnouncement.create( + task, + TaskStatus.running(task.getId()), + TaskLocation.unknown() + ) + ); + + log.info("Affirmative. Running task [%s]", task.getId()); + workerCuratorCoordinator.removeTaskRunZnode(task.getId()); + final ListenableFuture future = taskRunner.run(task); + addRunningTask(task, future); + } + } + + private class StatusNotice implements Notice + { + private final Task task; + private final TaskStatus status; + + public StatusNotice(Task task, TaskStatus status) + { + this.task = task; + this.status = status; + } + + @Override + public String getTaskId() + { + return task.getId(); + } + + @Override + public void handle() + { + final TaskDetails details = running.get(task.getId()); + + if (details == null) { + log.warn("Got status notice for task [%s] that isn't running...", task.getId()); + return; + } + + if (!status.isComplete()) { + log.warn( + "WTF?! Got status notice for task [%s] that isn't complete (status = [%s])...", + task.getId(), + status.getStatusCode() + ); + return; + } + + details.status = status.withDuration(System.currentTimeMillis() - details.startTime); + + try { + workerCuratorCoordinator.updateTaskStatusAnnouncement( + TaskAnnouncement.create( + details.task, + details.status, + details.location + ) + ); + log.info( + "Job's finished. Completed [%s] with status [%s]", + task.getId(), + status.getStatusCode() + ); + } + catch (Exception e) { + log.makeAlert(e, "Failed to update task announcement") + .addData("task", task.getId()) + .emit(); + } + finally { + running.remove(task.getId()); + } + } + } + + private class LocationNotice implements Notice + { + private final String taskId; + private final TaskLocation location; + + public LocationNotice(String taskId, TaskLocation location) + { + this.taskId = taskId; + this.location = location; + } + + @Override + public String getTaskId() + { + return taskId; + } + + @Override + public void handle() + { + final TaskDetails details = running.get(taskId); + + if (details == null) { + log.warn("Got location notice for task [%s] that isn't running...", taskId); + return; + } + + if (!Objects.equals(details.location, location)) { + details.location = location; + + try { + log.info("Updating task [%s] announcement with location [%s]", taskId, location); + workerCuratorCoordinator.updateTaskStatusAnnouncement( + TaskAnnouncement.create( + details.task, + details.status, + details.location + ) + ); + } + catch (Exception e) { + log.makeAlert(e, "Failed to update task announcement") + .addData("task", taskId) + .emit(); + } + } } } } diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TestMergeTask.java b/indexing-service/src/test/java/io/druid/indexing/common/TestMergeTask.java deleted file mode 100644 index 92bbc8709d37..000000000000 --- a/indexing-service/src/test/java/io/druid/indexing/common/TestMergeTask.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.indexing.common; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonTypeName; -import com.google.common.collect.Lists; -import io.druid.indexing.common.task.MergeTask; -import io.druid.query.aggregation.AggregatorFactory; -import io.druid.segment.IndexSpec; -import io.druid.timeline.DataSegment; -import org.joda.time.DateTime; -import org.joda.time.Interval; - -import java.util.List; - -/** - */ -@JsonTypeName("test") -public class TestMergeTask extends MergeTask -{ - public static TestMergeTask createDummyTask(String taskId) - { - return new TestMergeTask( - taskId, - "dummyDs", - Lists.newArrayList( - new DataSegment( - "dummyDs", - new Interval(new DateTime(), new DateTime().plus(1)), - new DateTime().toString(), - null, - null, - null, - null, - 0, - 0 - ) - ), - Lists.newArrayList(), - new IndexSpec() - ); - } - - private final String id; - - @JsonCreator - public TestMergeTask( - @JsonProperty("id") String id, - @JsonProperty("dataSource") String dataSource, - @JsonProperty("segments") List segments, - @JsonProperty("aggregations") List aggregators, - @JsonProperty("indexSpec") IndexSpec indexSpec - ) - { - super(id, dataSource, segments, aggregators, indexSpec, null); - this.id = id; - } - - @Override - @JsonProperty - public String getType() - { - return "test"; - } - - @Override - public TaskStatus run(TaskToolbox toolbox) throws Exception - { - return TaskStatus.running(id); - } -} diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TestTasks.java b/indexing-service/src/test/java/io/druid/indexing/common/TestTasks.java new file mode 100644 index 000000000000..c9faf99ddda6 --- /dev/null +++ b/indexing-service/src/test/java/io/druid/indexing/common/TestTasks.java @@ -0,0 +1,108 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.indexing.common.actions.TaskActionClient; +import io.druid.indexing.common.task.AbstractTask; +import io.druid.indexing.common.task.Task; + +public class TestTasks +{ + private static final String DATASOURCE = "dummyDs"; + + public static void registerSubtypes(ObjectMapper mapper) + { + mapper.registerSubtypes(ImmediateSuccessTask.class, UnendingTask.class); + } + + public static Task immediateSuccess(String id) + { + return new ImmediateSuccessTask(id); + } + + public static Task unending(String id) + { + return new UnendingTask(id); + } + + @JsonTypeName("immediateSuccess") + public static class ImmediateSuccessTask extends AbstractTask + { + @JsonCreator + public ImmediateSuccessTask(@JsonProperty("id") String id) + { + super(id, DATASOURCE, null); + } + + @Override + public String getType() + { + return "immediateSuccess"; + } + + @Override + public boolean isReady(TaskActionClient taskActionClient) throws Exception + { + return true; + } + + @Override + public TaskStatus run(TaskToolbox toolbox) throws Exception + { + return TaskStatus.success(getId()); + } + } + + @JsonTypeName("unending") + public static class UnendingTask extends AbstractTask + { + @JsonCreator + public UnendingTask(@JsonProperty("id") String id) + { + super(id, DATASOURCE, null); + } + + @Override + public String getType() + { + return "unending"; + } + + @Override + public boolean isReady(TaskActionClient taskActionClient) throws Exception + { + return true; + } + + @Override + public TaskStatus run(TaskToolbox toolbox) throws Exception + { + while (!Thread.currentThread().isInterrupted()) { + Thread.sleep(1000); + } + + return TaskStatus.failure(getId()); + } + } +} diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java index 57209c793faa..d551db3dcbaa 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java @@ -35,15 +35,14 @@ import io.druid.curator.PotentiallyGzippedCompressionProvider; import io.druid.curator.cache.SimplePathChildrenCacheFactory; import io.druid.indexing.common.IndexingServiceCondition; +import io.druid.indexing.common.TaskLocation; import io.druid.indexing.common.TaskStatus; -import io.druid.indexing.common.TestMergeTask; import io.druid.indexing.common.TestRealtimeTask; +import io.druid.indexing.common.TestTasks; import io.druid.indexing.common.TestUtils; import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.TaskResource; import io.druid.indexing.overlord.autoscaling.NoopResourceManagementStrategy; -import io.druid.indexing.overlord.autoscaling.ResourceManagementStrategy; -import io.druid.indexing.overlord.autoscaling.ScalingStats; import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import io.druid.indexing.overlord.setup.WorkerBehaviorConfig; import io.druid.indexing.worker.TaskAnnouncement; @@ -77,6 +76,7 @@ public class RemoteTaskRunnerTest private static final String tasksPath = String.format("%s/indexer/tasks/worker", basePath); private static final String statusPath = String.format("%s/indexer/status/worker", basePath); private static final int TIMEOUT_SECONDS = 20; + private static final TaskLocation DUMMY_LOCATION = TaskLocation.create("dummy", 9000); private ObjectMapper jsonMapper; @@ -84,7 +84,7 @@ public class RemoteTaskRunnerTest private CuratorFramework cf; private RemoteTaskRunner remoteTaskRunner; - private TestMergeTask task; + private Task task; private Worker worker; @@ -107,7 +107,7 @@ public void setUp() throws Exception cf.create().creatingParentsIfNeeded().forPath(basePath); cf.create().creatingParentsIfNeeded().forPath(tasksPath); - task = TestMergeTask.createDummyTask("task"); + task = TestTasks.unending("task"); } @After @@ -191,7 +191,7 @@ public void testRunTooMuchZKData() throws Exception doSetup(); - remoteTaskRunner.run(TestMergeTask.createDummyTask(new String(new char[5000]))); + remoteTaskRunner.run(TestTasks.unending(new String(new char[5000]))); EasyMock.verify(emitter); } @@ -557,7 +557,7 @@ private void mockWorkerRunningTask(final Task task) throws Exception { cf.delete().forPath(joiner.join(tasksPath, task.getId())); - TaskAnnouncement taskAnnouncement = TaskAnnouncement.create(task, TaskStatus.running(task.getId())); + TaskAnnouncement taskAnnouncement = TaskAnnouncement.create(task, TaskStatus.running(task.getId()), DUMMY_LOCATION); cf.create() .creatingParentsIfNeeded() .forPath(joiner.join(statusPath, task.getId()), jsonMapper.writeValueAsBytes(taskAnnouncement)); @@ -565,7 +565,7 @@ private void mockWorkerRunningTask(final Task task) throws Exception private void mockWorkerCompleteSuccessfulTask(final Task task) throws Exception { - TaskAnnouncement taskAnnouncement = TaskAnnouncement.create(task, TaskStatus.success(task.getId())); + TaskAnnouncement taskAnnouncement = TaskAnnouncement.create(task, TaskStatus.success(task.getId()), DUMMY_LOCATION); cf.setData().forPath(joiner.join(statusPath, task.getId()), jsonMapper.writeValueAsBytes(taskAnnouncement)); } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 1ed7caf47d9a..ff0e99f13838 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -100,6 +100,7 @@ import io.druid.segment.realtime.FireDepartmentTest; import io.druid.segment.realtime.plumber.SegmentHandoffNotifier; import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; +import io.druid.server.DruidNode; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; @@ -217,20 +218,20 @@ public int compare(DataSegment dataSegment, DataSegment dataSegment2) private static ServiceEmitter newMockEmitter() { - return new ServiceEmitter(null, null, null) + return new ServiceEmitter(null, null, null) + { + @Override + public void emit(Event event) { - @Override - public void emit(Event event) - { - } + } - @Override - public void emit(ServiceEventBuilder builder) - { + @Override + public void emit(ServiceEventBuilder builder) + { - } - }; + } + }; } private static InputRow IR(String dt, String dim1, String dim2, float met) @@ -607,7 +608,12 @@ private TaskRunner setUpThreadPoolTaskRunner(TaskToolboxFactory tb) Preconditions.checkNotNull(taskConfig); Preconditions.checkNotNull(emitter); - return new ThreadPoolTaskRunner(tb, taskConfig, emitter); + return new ThreadPoolTaskRunner( + tb, + taskConfig, + emitter, + new DruidNode("dummy", "dummy", 10000) + ); } private TaskQueue setUpTaskQueue(TaskStorage ts, TaskRunner tr) throws Exception diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategyTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategyTest.java index 32adbafc5c47..4cf2fae0bc90 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategyTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategyTest.java @@ -28,8 +28,9 @@ import com.metamx.emitter.service.ServiceEventBuilder; import io.druid.common.guava.DSuppliers; import io.druid.concurrent.Execs; +import io.druid.indexing.common.TaskLocation; import io.druid.indexing.common.TaskStatus; -import io.druid.indexing.common.TestMergeTask; +import io.druid.indexing.common.TestTasks; import io.druid.indexing.common.task.NoopTask; import io.druid.indexing.common.task.Task; import io.druid.indexing.overlord.RemoteTaskRunner; @@ -39,12 +40,8 @@ import io.druid.indexing.worker.TaskAnnouncement; import io.druid.indexing.worker.Worker; import io.druid.jackson.DefaultObjectMapper; -import io.druid.query.aggregation.AggregatorFactory; -import io.druid.segment.IndexSpec; -import io.druid.timeline.DataSegment; import org.easymock.EasyMock; import org.joda.time.DateTime; -import org.joda.time.Interval; import org.joda.time.Period; import org.junit.After; import org.junit.Assert; @@ -72,28 +69,7 @@ public class SimpleResourceManagementStrategyTest public void setUp() throws Exception { autoScaler = EasyMock.createMock(AutoScaler.class); - - final IndexSpec indexSpec = new IndexSpec(); - - testTask = new TestMergeTask( - "task1", - "dummyDs", - Lists.newArrayList( - new DataSegment( - "dummyDs", - new Interval("2012-01-01/2012-01-02"), - new DateTime().toString(), - null, - null, - null, - null, - 0, - 0 - ) - ), - Lists.newArrayList(), - indexSpec - ); + testTask = TestTasks.immediateSuccess("task1"); final SimpleResourceManagementConfig simpleResourceManagementConfig = new SimpleResourceManagementConfig() .setWorkerIdleTimeout(new Period(0)) @@ -138,7 +114,7 @@ public void testSuccessfulProvision() throws Exception RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); EasyMock.expect(runner.getPendingTasks()).andReturn( Collections.singletonList( - new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime()) + new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime()) ) ); EasyMock.expect(runner.getWorkers()).andReturn( @@ -174,7 +150,7 @@ public void testSomethingProvisioning() throws Exception RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); EasyMock.expect(runner.getPendingTasks()).andReturn( Collections.singletonList( - new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime()) + new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime()) ) ).times(2); EasyMock.expect(runner.getWorkers()).andReturn( @@ -231,7 +207,7 @@ public void testProvisionAlert() throws Exception RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); EasyMock.expect(runner.getPendingTasks()).andReturn( Collections.singletonList( - new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime()) + new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime()) ) ).times(2); EasyMock.expect(runner.getWorkers()).andReturn( @@ -282,7 +258,7 @@ public void testDoSuccessfulTerminate() throws Exception RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); EasyMock.expect(runner.getPendingTasks()).andReturn( Collections.singletonList( - new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime()) + new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime()) ) ).times(2); EasyMock.expect(runner.getWorkers()).andReturn( @@ -324,7 +300,7 @@ public void testSomethingTerminating() throws Exception RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); EasyMock.expect(runner.getPendingTasks()).andReturn( Collections.singletonList( - new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime()) + new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime()) ) ).times(2); EasyMock.expect(runner.getWorkers()).andReturn( @@ -373,7 +349,7 @@ public void testNoActionNeeded() throws Exception RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); EasyMock.expect(runner.getPendingTasks()).andReturn( Collections.singletonList( - new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime()) + new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime()) ) ).times(2); EasyMock.expect(runner.getWorkers()).andReturn( @@ -482,7 +458,7 @@ public void testNullWorkerConfig() throws Exception RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); EasyMock.expect(runner.getPendingTasks()).andReturn( Collections.singletonList( - new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime()) + new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime()) ) ).times(2); EasyMock.expect(runner.getWorkers()).andReturn( @@ -536,7 +512,14 @@ public Map getRunningTasks() if (testTask == null) { return Maps.newHashMap(); } - return ImmutableMap.of(testTask.getId(), TaskAnnouncement.create(testTask, TaskStatus.running(testTask.getId()))); + return ImmutableMap.of( + testTask.getId(), + TaskAnnouncement.create( + testTask, + TaskStatus.running(testTask.getId()), + TaskLocation.unknown() + ) + ); } } } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java index 59890f5b48ba..5ef4fd3c8c03 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java @@ -33,6 +33,7 @@ import io.druid.concurrent.Execs; import io.druid.curator.PotentiallyGzippedCompressionProvider; import io.druid.curator.discovery.NoopServiceAnnouncer; +import io.druid.indexing.common.TaskLocation; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.config.TaskStorageConfig; @@ -43,6 +44,7 @@ import io.druid.indexing.overlord.TaskMaster; import io.druid.indexing.overlord.TaskRunner; import io.druid.indexing.overlord.TaskRunnerFactory; +import io.druid.indexing.overlord.TaskRunnerListener; import io.druid.indexing.overlord.TaskRunnerWorkItem; import io.druid.indexing.overlord.TaskStorage; import io.druid.indexing.overlord.TaskStorageQueryAdapter; @@ -73,10 +75,13 @@ import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; public class OverlordResourceTest { + private static final TaskLocation TASK_LOCATION = new TaskLocation("dummy", 1000); + private TestingServer server; private Timing timing; private CuratorFramework curator; @@ -234,6 +239,10 @@ public void testOverlordResource() throws Exception response = overlordResource.getRunningTasks(); // 1 task that was manually inserted should be in running state Assert.assertEquals(1, (((List) response.getEntity()).size())); + final OverlordResource.TaskResponseObject taskResponseObject = ((List) response + .getEntity()).get(0); + Assert.assertEquals(taskId_1, taskResponseObject.toJson().get("id")); + Assert.assertEquals(TASK_LOCATION, taskResponseObject.toJson().get("location")); // Simulate completion of task_1 taskCompletionCountDownLatches[Integer.parseInt(taskId_1)].countDown(); @@ -291,6 +300,12 @@ public List>> restore() return ImmutableList.of(); } + public void registerListener(TaskRunnerListener listener, Executor executor) + { + // Overlord doesn't call this method + throw new UnsupportedOperationException(); + } + @Override public synchronized ListenableFuture run(final Task task) { @@ -319,7 +334,14 @@ public TaskStatus call() throws Exception } } ); - TaskRunnerWorkItem taskRunnerWorkItem = new TaskRunnerWorkItem(taskId, future); + TaskRunnerWorkItem taskRunnerWorkItem = new TaskRunnerWorkItem(taskId, future) + { + @Override + public TaskLocation getLocation() + { + return TASK_LOCATION; + } + }; taskRunnerWorkItems.put(taskId, taskRunnerWorkItem); return future; } diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java index 941545ec4d58..1d160e99bff6 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java @@ -20,6 +20,7 @@ package io.druid.indexing.worker; import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.indexing.common.TaskLocation; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TestUtils; import io.druid.indexing.common.task.RealtimeIndexTask; @@ -77,7 +78,7 @@ public Plumber findPlumber( null ); final TaskStatus status = TaskStatus.running(task.getId()); - final TaskAnnouncement announcement = TaskAnnouncement.create(task, status); + final TaskAnnouncement announcement = TaskAnnouncement.create(task, status, TaskLocation.unknown()); final String statusJson = jsonMapper.writeValueAsString(status); final String announcementJson = jsonMapper.writeValueAsString(announcement); diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java index 0105cde6172e..bfe2264926a0 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -29,8 +29,8 @@ import io.druid.indexing.common.SegmentLoaderFactory; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskToolboxFactory; -import io.druid.indexing.common.TestMergeTask; import io.druid.indexing.common.TestRealtimeTask; +import io.druid.indexing.common.TestTasks; import io.druid.indexing.common.TestUtils; import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionClientFactory; @@ -38,7 +38,6 @@ import io.druid.indexing.common.task.Task; import io.druid.indexing.overlord.TestRemoteTaskRunnerConfig; import io.druid.indexing.overlord.ThreadPoolTaskRunner; -import io.druid.indexing.worker.config.WorkerConfig; import io.druid.segment.IndexIO; import io.druid.segment.IndexMerger; import io.druid.segment.IndexMergerV9; @@ -46,6 +45,7 @@ import io.druid.segment.loading.SegmentLoaderLocalCacheManager; import io.druid.segment.loading.StorageLocationConfig; import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; +import io.druid.server.DruidNode; import io.druid.server.initialization.IndexerZkConfig; import io.druid.server.initialization.ZkPathsConfig; import io.druid.server.metrics.NoopServiceEmitter; @@ -71,13 +71,14 @@ public class WorkerTaskMonitorTest private static final String basePath = "/test/druid"; private static final String tasksPath = String.format("%s/indexer/tasks/worker", basePath); private static final String statusPath = String.format("%s/indexer/status/worker", basePath); + private static final DruidNode DUMMY_NODE = new DruidNode("dummy", "dummy", 9000); private TestingCluster testingCluster; private CuratorFramework cf; private WorkerCuratorCoordinator workerCuratorCoordinator; private WorkerTaskMonitor workerTaskMonitor; - private TestMergeTask task; + private Task task; private Worker worker; private ObjectMapper jsonMapper; @@ -138,11 +139,11 @@ public String getBase() // Start a task monitor workerTaskMonitor = createTaskMonitor(); - jsonMapper.registerSubtypes(new NamedType(TestMergeTask.class, "test")); + TestTasks.registerSubtypes(jsonMapper); jsonMapper.registerSubtypes(new NamedType(TestRealtimeTask.class, "test_realtime")); workerTaskMonitor.start(); - task = TestMergeTask.createDummyTask("test"); + task = TestTasks.immediateSuccess("test"); } private WorkerTaskMonitor createTaskMonitor() @@ -192,21 +193,22 @@ public List getLocations() indexMergerV9 ), taskConfig, - new NoopServiceEmitter() - ), - new WorkerConfig().setCapacity(1) + new NoopServiceEmitter(), + DUMMY_NODE + ) ); } @After public void tearDown() throws Exception { + workerCuratorCoordinator.stop(); workerTaskMonitor.stop(); cf.close(); testingCluster.stop(); } - @Test + @Test(timeout = 30_000L) public void testRunTask() throws Exception { Assert.assertTrue( @@ -239,7 +241,12 @@ public boolean isValid() public boolean isValid() { try { - return cf.checkExists().forPath(joiner.join(statusPath, task.getId())) != null; + final byte[] bytes = cf.getData().forPath(joiner.join(statusPath, task.getId())); + final TaskAnnouncement announcement = jsonMapper.readValue( + bytes, + TaskAnnouncement.class + ); + return announcement.getTaskStatus().isComplete(); } catch (Exception e) { return false; @@ -254,10 +261,10 @@ public boolean isValid() ); Assert.assertEquals(task.getId(), taskAnnouncement.getTaskStatus().getId()); - Assert.assertEquals(TaskStatus.Status.RUNNING, taskAnnouncement.getTaskStatus().getStatusCode()); + Assert.assertEquals(TaskStatus.Status.SUCCESS, taskAnnouncement.getTaskStatus().getStatusCode()); } - @Test + @Test(timeout = 30_000L) public void testGetAnnouncements() throws Exception { cf.create() @@ -272,7 +279,12 @@ public void testGetAnnouncements() throws Exception public boolean isValid() { try { - return cf.checkExists().forPath(joiner.join(statusPath, task.getId())) != null; + final byte[] bytes = cf.getData().forPath(joiner.join(statusPath, task.getId())); + final TaskAnnouncement announcement = jsonMapper.readValue( + bytes, + TaskAnnouncement.class + ); + return announcement.getTaskStatus().isComplete(); } catch (Exception e) { return false; @@ -285,12 +297,16 @@ public boolean isValid() List announcements = workerCuratorCoordinator.getAnnouncements(); Assert.assertEquals(1, announcements.size()); Assert.assertEquals(task.getId(), announcements.get(0).getTaskStatus().getId()); - Assert.assertEquals(TaskStatus.Status.RUNNING, announcements.get(0).getTaskStatus().getStatusCode()); + Assert.assertEquals(TaskStatus.Status.SUCCESS, announcements.get(0).getTaskStatus().getStatusCode()); + Assert.assertEquals(DUMMY_NODE.getHost(), announcements.get(0).getTaskLocation().getHost()); + Assert.assertEquals(DUMMY_NODE.getPort(), announcements.get(0).getTaskLocation().getPort()); } - @Test + @Test(timeout = 30_000L) public void testRestartCleansOldStatus() throws Exception { + task = TestTasks.unending("test"); + cf.create() .creatingParentsIfNeeded() .forPath(joiner.join(tasksPath, task.getId()), jsonMapper.writeValueAsBytes(task)); @@ -322,7 +338,7 @@ public boolean isValid() Assert.assertEquals(TaskStatus.Status.FAILED, announcements.get(0).getTaskStatus().getStatusCode()); } - @Test + @Test(timeout = 30_000L) public void testStatusAnnouncementsArePersistent() throws Exception { cf.create()