diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index a506f33521be..50d5d8b4fc7a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -28,6 +28,8 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import org.apache.druid.annotations.SuppressFBWarnings; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.Counters; @@ -53,9 +55,12 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -63,7 +68,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; @@ -79,8 +83,11 @@ public class TaskQueue { private final long MANAGEMENT_WAIT_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(60); + private final long MIN_WAIT_TIME_MS = 100; + @GuardedBy("giant") private final List tasks = new ArrayList<>(); + @GuardedBy("giant") private final Map> taskFutures = new HashMap<>(); private final TaskLockConfig lockConfig; @@ -93,7 +100,8 @@ public class TaskQueue private final ServiceEmitter emitter; private final ReentrantLock giant = new ReentrantLock(true); - private final Condition managementMayBeNecessary = giant.newCondition(); + @SuppressWarnings("MismatchedQueryAndUpdateOfCollection") + private final BlockingQueue managementMayBeNecessary = new ArrayBlockingQueue<>(8); private final ExecutorService managerExec = Executors.newSingleThreadExecutor( new ThreadFactoryBuilder() .setDaemon(false) @@ -111,7 +119,9 @@ public class TaskQueue private final ConcurrentHashMap totalSuccessfulTaskCount = new ConcurrentHashMap<>(); private final ConcurrentHashMap totalFailedTaskCount = new ConcurrentHashMap<>(); + @GuardedBy("totalSuccessfulTaskCount") private Map prevTotalSuccessfulTaskCount = new HashMap<>(); + @GuardedBy("totalFailedTaskCount") private Map prevTotalFailedTaskCount = new HashMap<>(); public TaskQueue( @@ -207,7 +217,7 @@ public ScheduledExecutors.Signal call() } } ); - managementMayBeNecessary.signalAll(); + requestManagement(); } finally { giant.unlock(); @@ -228,7 +238,7 @@ public void stop() active = false; managerExec.shutdownNow(); storageSyncExec.shutdownNow(); - managementMayBeNecessary.signalAll(); + requestManagement(); } finally { giant.unlock(); @@ -240,6 +250,52 @@ public boolean isActive() return active; } + /** + * Request management from the management thread. Non-blocking. + * + * Other callers (such as notifyStatus) should trigger activity on the + * TaskQueue thread by requesting management here. + */ + void requestManagement() + { + // use a BlockingQueue since the offer/poll/wait behaviour is simple + // and very easy to reason about + + // the request has to be offer (non blocking), since someone might request + // while already holding giant lock + + // do not care if the item fits into the queue: + // if the queue is already full, request has been triggered anyway + managementMayBeNecessary.offer(this); + } + + /** + * Await for an event to manage. + * + * This should only be called from the management thread to wait for activity. + * + * @param nanos + * @throws InterruptedException + */ + @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification = "using queue as notification mechanism, result has no value") + void awaitManagementNanos(long nanos) throws InterruptedException + { + // mitigate a busy loop, it can get pretty busy when there are a lot of start/stops + try { + Thread.sleep(MIN_WAIT_TIME_MS); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + + // wait for an item, if an item arrives (or is already available), complete immediately + // (does not actually matter what the item is) + managementMayBeNecessary.poll(nanos - (TimeUnit.MILLISECONDS.toNanos(MIN_WAIT_TIME_MS)), TimeUnit.NANOSECONDS); + + // there may have been multiple requests, clear them all + managementMayBeNecessary.clear(); + } + /** * Main task runner management loop. Meant to run forever, or, at least until we're stopped. */ @@ -252,31 +308,54 @@ private void manage() throws InterruptedException taskRunner.restore(); while (active) { - giant.lock(); + manageInternal(); - try { - manageInternal(); - // awaitNanos because management may become necessary without this condition signalling, - // due to e.g. tasks becoming ready when other folks mess with the TaskLockbox. - managementMayBeNecessary.awaitNanos(MANAGEMENT_WAIT_TIMEOUT_NANOS); - } - finally { - giant.unlock(); - } + // awaitNanos because management may become necessary without this condition signalling, + // due to e.g. tasks becoming ready when other folks mess with the TaskLockbox. + awaitManagementNanos(MANAGEMENT_WAIT_TIMEOUT_NANOS); } } @VisibleForTesting void manageInternal() + { + Set knownTaskIds = new HashSet<>(); + Map> runnerTaskFutures = new HashMap<>(); + + giant.lock(); + + try { + manageInternalCritical(knownTaskIds, runnerTaskFutures); + } + finally { + giant.unlock(); + } + + manageInternalPostCritical(knownTaskIds, runnerTaskFutures); + } + + + /** + * Management loop critical section tasks. + * + * @param knownTaskIds will be modified - filled with known task IDs + * @param runnerTaskFutures will be modified - filled with futures related to getting the running tasks + */ + @GuardedBy("giant") + private void manageInternalCritical( + final Set knownTaskIds, + final Map> runnerTaskFutures + ) { // Task futures available from the taskRunner - final Map> runnerTaskFutures = new HashMap<>(); for (final TaskRunnerWorkItem workItem : taskRunner.getKnownTasks()) { runnerTaskFutures.put(workItem.getTaskId(), workItem.getResult()); } // Attain futures for all active tasks (assuming they are ready to run). // Copy tasks list, as notifyStatus may modify it. for (final Task task : ImmutableList.copyOf(tasks)) { + knownTaskIds.add(task.getId()); + if (!taskFutures.containsKey(task.getId())) { final ListenableFuture runnerTaskFuture; if (runnerTaskFutures.containsKey(task.getId())) { @@ -317,11 +396,15 @@ void manageInternal() taskRunner.run(task); } } + } + + @VisibleForTesting + private void manageInternalPostCritical( + final Set knownTaskIds, + final Map> runnerTaskFutures + ) + { // Kill tasks that shouldn't be running - final Set knownTaskIds = tasks - .stream() - .map(Task::getId) - .collect(Collectors.toSet()); final Set tasksToKill = Sets.difference(runnerTaskFutures.keySet(), knownTaskIds); if (!tasksToKill.isEmpty()) { log.info("Asking taskRunner to clean up %,d tasks.", tasksToKill.size()); @@ -387,7 +470,7 @@ public boolean add(final Task task) throws EntryExistsException // insert the task into our queue. So don't catch it. taskStorage.insert(task, TaskStatus.running(task.getId())); addTaskInternal(task); - managementMayBeNecessary.signalAll(); + requestManagement(); return true; } finally { @@ -396,6 +479,7 @@ public boolean add(final Task task) throws EntryExistsException } // Should always be called after taking giantLock + @GuardedBy("giant") private void addTaskInternal(final Task task) { tasks.add(task); @@ -403,6 +487,7 @@ private void addTaskInternal(final Task task) } // Should always be called after taking giantLock + @GuardedBy("giant") private void removeTaskInternal(final Task task) { taskLockbox.remove(task); @@ -473,30 +558,33 @@ public void shutdownWithSuccess(final String taskId, String reasonFormat, Object */ private void notifyStatus(final Task task, final TaskStatus taskStatus, String reasonFormat, Object... args) { - giant.lock(); + Preconditions.checkNotNull(task, "task"); + Preconditions.checkNotNull(taskStatus, "status"); + Preconditions.checkState(active, "Queue is not active!"); + Preconditions.checkArgument( + task.getId().equals(taskStatus.getId()), + "Mismatching task ids[%s/%s]", + task.getId(), + taskStatus.getId() + ); + // Inform taskRunner that this task can be shut down TaskLocation taskLocation = TaskLocation.unknown(); + try { + taskLocation = taskRunner.getTaskLocation(task.getId()); + taskRunner.shutdown(task.getId(), reasonFormat, args); + } + catch (Exception e) { + log.warn(e, "TaskRunner failed to cleanup task after completion: %s", task.getId()); + } + + int removed = 0; + ///////// critical section + + giant.lock(); try { - Preconditions.checkNotNull(task, "task"); - Preconditions.checkNotNull(taskStatus, "status"); - Preconditions.checkState(active, "Queue is not active!"); - Preconditions.checkArgument( - task.getId().equals(taskStatus.getId()), - "Mismatching task ids[%s/%s]", - task.getId(), - taskStatus.getId() - ); - // Inform taskRunner that this task can be shut down - try { - taskLocation = taskRunner.getTaskLocation(task.getId()); - taskRunner.shutdown(task.getId(), reasonFormat, args); - } - catch (Exception e) { - log.warn(e, "TaskRunner failed to cleanup task after completion: %s", task.getId()); - } // Remove from running tasks - int removed = 0; for (int i = tasks.size() - 1; i >= 0; i--) { if (tasks.get(i).getId().equals(task.getId())) { removed++; @@ -504,36 +592,39 @@ private void notifyStatus(final Task task, final TaskStatus taskStatus, String r break; } } - if (removed == 0) { - log.warn("Unknown task completed: %s", task.getId()); - } else if (removed > 1) { - log.makeAlert("Removed multiple copies of task").addData("count", removed).addData("task", task.getId()).emit(); - } + // Remove from futures list taskFutures.remove(task.getId()); - if (removed > 0) { - // If we thought this task should be running, save status to DB - try { - final Optional previousStatus = taskStorage.getStatus(task.getId()); - if (!previousStatus.isPresent() || !previousStatus.get().isRunnable()) { - log.makeAlert("Ignoring notification for already-complete task").addData("task", task.getId()).emit(); - } else { - taskStorage.setStatus(taskStatus.withLocation(taskLocation)); - log.info("Task done: %s", task); - managementMayBeNecessary.signalAll(); - } - } - catch (Exception e) { - log.makeAlert(e, "Failed to persist status for task") - .addData("task", task.getId()) - .addData("statusCode", taskStatus.getStatusCode()) - .emit(); - } - } } finally { giant.unlock(); } + + ///////// end critical + + if (removed == 0) { + log.warn("Unknown task completed: %s", task.getId()); + } + + if (removed > 0) { + // If we thought this task should be running, save status to DB + try { + final Optional previousStatus = taskStorage.getStatus(task.getId()); + if (!previousStatus.isPresent() || !previousStatus.get().isRunnable()) { + log.makeAlert("Ignoring notification for already-complete task").addData("task", task.getId()).emit(); + } else { + taskStorage.setStatus(taskStatus.withLocation(taskLocation)); + log.info("Task done: %s", task); + requestManagement(); + } + } + catch (Exception e) { + log.makeAlert(e, "Failed to persist status for task") + .addData("task", task.getId()) + .addData("statusCode", taskStatus.getStatusCode()) + .emit(); + } + } } /** @@ -655,7 +746,7 @@ private void syncFromStorage() addedTasks.size(), removedTasks.size() ); - managementMayBeNecessary.signalAll(); + requestManagement(); } else { log.info("Not active. Skipping storage sync."); } @@ -688,22 +779,37 @@ private Map getDeltaValues(Map total, Map getSuccessfulTaskCount() { Map total = CollectionUtils.mapValues(totalSuccessfulTaskCount, AtomicLong::get); - Map delta = getDeltaValues(total, prevTotalSuccessfulTaskCount); - prevTotalSuccessfulTaskCount = total; - return delta; + synchronized (totalSuccessfulTaskCount) { + Map delta = getDeltaValues(total, prevTotalSuccessfulTaskCount); + prevTotalSuccessfulTaskCount = total; + return delta; + } } public Map getFailedTaskCount() { Map total = CollectionUtils.mapValues(totalFailedTaskCount, AtomicLong::get); - Map delta = getDeltaValues(total, prevTotalFailedTaskCount); - prevTotalFailedTaskCount = total; - return delta; + synchronized (totalFailedTaskCount) { + Map delta = getDeltaValues(total, prevTotalFailedTaskCount); + prevTotalFailedTaskCount = total; + return delta; + } + } + + Map getCurrentTaskDatasources() + { + giant.lock(); + try { + return tasks.stream().collect(Collectors.toMap(Task::getId, Task::getDataSource)); + } + finally { + giant.unlock(); + } } public Map getRunningTaskCount() { - Map taskDatasources = tasks.stream().collect(Collectors.toMap(Task::getId, Task::getDataSource)); + Map taskDatasources = getCurrentTaskDatasources(); return taskRunner.getRunningTasks() .stream() .collect(Collectors.toMap( @@ -715,7 +821,7 @@ public Map getRunningTaskCount() public Map getPendingTaskCount() { - Map taskDatasources = tasks.stream().collect(Collectors.toMap(Task::getId, Task::getDataSource)); + Map taskDatasources = getCurrentTaskDatasources(); return taskRunner.getPendingTasks() .stream() .collect(Collectors.toMap( @@ -731,13 +837,26 @@ public Map getWaitingTaskCount() .stream() .map(TaskRunnerWorkItem::getTaskId) .collect(Collectors.toSet()); - return tasks.stream().filter(task -> !runnerKnownTaskIds.contains(task.getId())) - .collect(Collectors.toMap(Task::getDataSource, task -> 1L, Long::sum)); + + giant.lock(); + try { + return tasks.stream().filter(task -> !runnerKnownTaskIds.contains(task.getId())) + .collect(Collectors.toMap(Task::getDataSource, task -> 1L, Long::sum)); + } + finally { + giant.unlock(); + } } @VisibleForTesting List getTasks() { - return tasks; + giant.lock(); + try { + return new ArrayList(tasks); + } + finally { + giant.unlock(); + } } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java new file mode 100644 index 000000000000..d305b0d6c9b2 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java @@ -0,0 +1,494 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import org.apache.druid.indexer.RunnerTaskState; +import org.apache.druid.indexer.TaskLocation; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.common.actions.TaskAction; +import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.indexing.common.actions.TaskActionClientFactory; +import org.apache.druid.indexing.common.config.TaskStorageConfig; +import org.apache.druid.indexing.common.task.NoopTask; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.autoscaling.ScalingStats; +import org.apache.druid.indexing.overlord.config.DefaultTaskConfig; +import org.apache.druid.indexing.overlord.config.TaskLockConfig; +import org.apache.druid.indexing.overlord.config.TaskQueueConfig; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator; +import org.apache.druid.metadata.TaskLookup; +import org.apache.druid.metadata.TestDerbyConnector; +import org.apache.druid.segment.TestHelper; +import org.apache.druid.server.metrics.NoopServiceEmitter; +import org.joda.time.Duration; +import org.joda.time.Period; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * Tests that {@link TaskQueue} is able to handle large numbers of concurrently-running tasks. + */ +public class TaskQueueScaleTest +{ + private static final String DATASOURCE = "ds"; + + private final int numTasks = 1000; + + @Rule + public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule(); + + private TaskQueue taskQueue; + private TaskStorage taskStorage; + private TestTaskRunner taskRunner; + private Closer closer; + + @Before + public void setUp() + { + EmittingLogger.registerEmitter(new NoopServiceEmitter()); + + closer = Closer.create(); + + // Be as realistic as possible; use actual classes for storage rather than mocks. + taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(Period.hours(1))); + taskRunner = new TestTaskRunner(); + closer.register(taskRunner::stop); + final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); + + final IndexerSQLMetadataStorageCoordinator storageCoordinator = new IndexerSQLMetadataStorageCoordinator( + jsonMapper, + derbyConnectorRule.metadataTablesConfigSupplier().get(), + derbyConnectorRule.getConnector() + ); + + final TaskActionClientFactory unsupportedTaskActionFactory = + task -> new TaskActionClient() + { + @Override + public RetType submit(TaskAction taskAction) + { + throw new UnsupportedOperationException(); + } + }; + + taskQueue = new TaskQueue( + new TaskLockConfig(), + new TaskQueueConfig(null, Period.millis(1), null, null), + new DefaultTaskConfig(), + taskStorage, + taskRunner, + unsupportedTaskActionFactory, // Not used for anything serious + new TaskLockbox(taskStorage, storageCoordinator), + new NoopServiceEmitter() + ); + + taskQueue.start(); + closer.register(taskQueue::stop); + } + + @After + public void tearDown() throws Exception + { + closer.close(); + } + + @Test(timeout = 60_000L) // more than enough time if the task queue is efficient + public void doMassLaunchAndExit() throws Exception + { + Assert.assertEquals("no tasks should be running", 0, taskRunner.getKnownTasks().size()); + Assert.assertEquals("no tasks should be known", 0, taskQueue.getTasks().size()); + Assert.assertEquals("no tasks should be running", 0, taskQueue.getRunningTaskCount().size()); + + // Add all tasks. + for (int i = 0; i < numTasks; i++) { + final TestTask testTask = new TestTask(i, 2000L /* runtime millis */); + taskQueue.add(testTask); + } + + // in theory we can get a race here, since we fetch the counts at separate times + Assert.assertEquals("all tasks should be known", numTasks, taskQueue.getTasks().size()); + long runningTasks = taskQueue.getRunningTaskCount().values().stream().mapToLong(Long::longValue).sum(); + long pendingTasks = taskQueue.getPendingTaskCount().values().stream().mapToLong(Long::longValue).sum(); + long waitingTasks = taskQueue.getWaitingTaskCount().values().stream().mapToLong(Long::longValue).sum(); + Assert.assertEquals("all tasks should be known", numTasks, (runningTasks + pendingTasks + waitingTasks)); + + // Wait for all tasks to finish. + final TaskLookup.CompleteTaskLookup completeTaskLookup = + TaskLookup.CompleteTaskLookup.of(numTasks, Duration.standardHours(1)); + + while (taskStorage.getTaskInfos(completeTaskLookup, DATASOURCE).size() < numTasks) { + Thread.sleep(100); + } + + Thread.sleep(100); + + Assert.assertEquals("no tasks should be active", 0, taskStorage.getActiveTasks().size()); + runningTasks = taskQueue.getRunningTaskCount().values().stream().mapToLong(Long::longValue).sum(); + pendingTasks = taskQueue.getPendingTaskCount().values().stream().mapToLong(Long::longValue).sum(); + waitingTasks = taskQueue.getWaitingTaskCount().values().stream().mapToLong(Long::longValue).sum(); + Assert.assertEquals("no tasks should be running", 0, runningTasks); + Assert.assertEquals("no tasks should be pending", 0, pendingTasks); + Assert.assertEquals("no tasks should be waiting", 0, waitingTasks); + } + + @Test(timeout = 60_000L) // more than enough time if the task queue is efficient + public void doMassLaunchAndShutdown() throws Exception + { + Assert.assertEquals("no tasks should be running", 0, taskRunner.getKnownTasks().size()); + + // Add all tasks. + final List taskIds = new ArrayList<>(); + for (int i = 0; i < numTasks; i++) { + final TestTask testTask = new TestTask( + i, + Duration.standardHours(1).getMillis() /* very long runtime millis, so we can do a shutdown */ + ); + taskQueue.add(testTask); + taskIds.add(testTask.getId()); + } + + // wait for all tasks to progress to running state + while (taskStorage.getActiveTasks().size() < numTasks) { + Thread.sleep(100); + } + Assert.assertEquals("all tasks should be running", numTasks, taskStorage.getActiveTasks().size()); + + // Shut down all tasks. + for (final String taskId : taskIds) { + taskQueue.shutdown(taskId, "test shutdown"); + } + + // Wait for all tasks to finish. + while (!taskStorage.getActiveTasks().isEmpty()) { + Thread.sleep(100); + } + + Assert.assertEquals("no tasks should be running", 0, taskStorage.getActiveTasks().size()); + + int completed = taskStorage.getTaskInfos( + TaskLookup.CompleteTaskLookup.of(numTasks, Duration.standardHours(1)), + DATASOURCE + ).size(); + Assert.assertEquals("all tasks should have completed", numTasks, completed); + } + + private static class TestTask extends NoopTask + { + private final int number; + private final long runtime; + + public TestTask(int number, long runtime) + { + super(null, null, DATASOURCE, 0, 0, null, null, Collections.emptyMap()); + this.number = number; + this.runtime = runtime; + } + + public int getNumber() + { + return number; + } + + public long getRuntimeMillis() + { + return runtime; + } + } + + private static class TestTaskRunner implements TaskRunner + { + private static final Logger log = new Logger(TestTaskRunner.class); + private static final Duration T_PENDING_TO_RUNNING = Duration.standardSeconds(2); + private static final Duration T_SHUTDOWN_ACK = Duration.millis(8); + private static final Duration T_SHUTDOWN_COMPLETE = Duration.standardSeconds(2); + + @GuardedBy("knownTasks") + private final Map knownTasks = new HashMap<>(); + + private final ScheduledExecutorService exec = ScheduledExecutors.fixed(8, "TaskQueueScaleTest-%s"); + + @Override + public void start() + { + throw new UnsupportedOperationException(); + } + + @Override + public ListenableFuture run(Task task) + { + // Production task runners generally do not take a long time to execute "run", but may take a long time to + // go from "running" to "pending". + synchronized (knownTasks) { + final TestTaskRunnerWorkItem item = knownTasks.computeIfAbsent(task.getId(), TestTaskRunnerWorkItem::new); + exec.schedule( + () -> { + try { + synchronized (knownTasks) { + final TestTaskRunnerWorkItem item2 = knownTasks.get(task.getId()); + if (item2.getState() == RunnerTaskState.PENDING) { + knownTasks.put(task.getId(), item2.withState(RunnerTaskState.RUNNING)); + } + } + + exec.schedule( + () -> { + try { + final TestTaskRunnerWorkItem item2; + synchronized (knownTasks) { + item2 = knownTasks.get(task.getId()); + knownTasks.put(task.getId(), item2.withState(RunnerTaskState.NONE)); + } + if (item2 != null) { + item2.setResult(TaskStatus.success(task.getId())); + } + } + catch (Throwable e) { + log.error(e, "Error in scheduled executor"); + } + }, + ((TestTask) task).getRuntimeMillis(), + TimeUnit.MILLISECONDS + ); + } + catch (Throwable e) { + log.error(e, "Error in scheduled executor"); + } + }, + T_PENDING_TO_RUNNING.getMillis(), + TimeUnit.MILLISECONDS + ); + + return item.getResult(); + } + } + + @Override + public void shutdown(String taskid, String reason) + { + // Production task runners take a long time to execute "shutdown" if the task is currently running. + synchronized (knownTasks) { + if (!knownTasks.containsKey(taskid)) { + return; + } + } + + threadSleep(T_SHUTDOWN_ACK); + + final TestTaskRunnerWorkItem existingTask; + synchronized (knownTasks) { + existingTask = knownTasks.get(taskid); + } + if (!existingTask.getResult().isDone()) { + exec.schedule(() -> { + existingTask.setResult(TaskStatus.failure("taskId", "stopped")); + synchronized (knownTasks) { + knownTasks.remove(taskid); + } + }, T_SHUTDOWN_COMPLETE.getMillis(), TimeUnit.MILLISECONDS); + } + } + + static void threadSleep(Duration duration) + { + try { + Thread.sleep(duration.getMillis()); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + + @Override + public void registerListener(TaskRunnerListener listener, Executor executor) + { + throw new UnsupportedOperationException(); + } + + @Override + public void unregisterListener(String listenerId) + { + throw new UnsupportedOperationException(); + } + + @Override + public List>> restore() + { + // Do nothing, and return null. (TaskQueue doesn't use the return value.) + return null; + } + + @Override + public void stop() + { + exec.shutdownNow(); + } + + @Override + public Collection getRunningTasks() + { + synchronized (knownTasks) { + return knownTasks.values() + .stream() + .filter(item -> item.getState() == RunnerTaskState.RUNNING) + .collect(Collectors.toList()); + } + } + + @Override + public Collection getPendingTasks() + { + synchronized (knownTasks) { + return knownTasks.values() + .stream() + .filter(item -> item.getState() == RunnerTaskState.PENDING) + .collect(Collectors.toList()); + } + } + + @Override + public Collection getKnownTasks() + { + synchronized (knownTasks) { + return ImmutableList.copyOf(knownTasks.values()); + } + } + + @Override + public Optional getScalingStats() + { + throw new UnsupportedOperationException(); + } + + @Override + public Map getTotalTaskSlotCount() + { + throw new UnsupportedOperationException(); + } + + @Override + public Map getIdleTaskSlotCount() + { + throw new UnsupportedOperationException(); + } + + @Override + public Map getUsedTaskSlotCount() + { + throw new UnsupportedOperationException(); + } + + @Override + public Map getLazyTaskSlotCount() + { + throw new UnsupportedOperationException(); + } + + @Override + public Map getBlacklistedTaskSlotCount() + { + throw new UnsupportedOperationException(); + } + } + + private static class TestTaskRunnerWorkItem extends TaskRunnerWorkItem + { + private final RunnerTaskState state; + + public TestTaskRunnerWorkItem(final String taskId) + { + this(taskId, SettableFuture.create(), RunnerTaskState.PENDING); + } + + private TestTaskRunnerWorkItem( + final String taskId, + final ListenableFuture result, + final RunnerTaskState state + ) + { + super(taskId, result); + this.state = state; + } + + public RunnerTaskState getState() + { + return state; + } + + @Override + public TaskLocation getLocation() + { + return TaskLocation.unknown(); + } + + @Nullable + @Override + public String getTaskType() + { + throw new UnsupportedOperationException(); + } + + @Override + public String getDataSource() + { + throw new UnsupportedOperationException(); + } + + public void setResult(final TaskStatus result) + { + ((SettableFuture) getResult()).set(result); + + // possibly a parallel shutdown request was issued during the + // shutdown time; ignore it + } + + public TestTaskRunnerWorkItem withState(final RunnerTaskState newState) + { + return new TestTaskRunnerWorkItem(getTaskId(), getResult(), newState); + } + } +} +