From 6f4d9ebb753a09e0ee32ec488aae67071c0c761b Mon Sep 17 00:00:00 2001 From: Jason Koch Date: Mon, 27 Dec 2021 12:08:21 -0800 Subject: [PATCH 1/7] concurrency: introduce GuardedBy to TaskQueue --- .../druid/indexing/overlord/TaskQueue.java | 58 +++++++++++++++---- .../indexing/overlord/TaskQueueTest.java | 2 + 2 files changed, 49 insertions(+), 11 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index 01dec0dad445..bb23b016a426 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -28,6 +28,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.errorprone.annotations.concurrent.GuardedBy; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.Counters; @@ -80,7 +81,9 @@ public class TaskQueue { private final long MANAGEMENT_WAIT_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(60); + @GuardedBy("giant") private final List tasks = new ArrayList<>(); + @GuardedBy("giant") private final Map> taskFutures = new HashMap<>(); private final TaskLockConfig lockConfig; @@ -111,7 +114,9 @@ public class TaskQueue private final ConcurrentHashMap totalSuccessfulTaskCount = new ConcurrentHashMap<>(); private final ConcurrentHashMap totalFailedTaskCount = new ConcurrentHashMap<>(); + @GuardedBy("totalSuccessfulTaskCount") private Map prevTotalSuccessfulTaskCount = new HashMap<>(); + @GuardedBy("totalFailedTaskCount") private Map prevTotalFailedTaskCount = new HashMap<>(); public TaskQueue( @@ -267,6 +272,7 @@ private void manage() throws InterruptedException } @VisibleForTesting + @GuardedBy("giant") void manageInternal() { // Task futures available from the taskRunner @@ -392,6 +398,7 @@ public boolean add(final Task task) throws EntryExistsException } // Should always be called after taking giantLock + @GuardedBy("giant") private void addTaskInternal(final Task task) { tasks.add(task); @@ -399,6 +406,7 @@ private void addTaskInternal(final Task task) } // Should always be called after taking giantLock + @GuardedBy("giant") private void removeTaskInternal(final Task task) { taskLockbox.remove(task); @@ -684,22 +692,37 @@ private Map getDeltaValues(Map total, Map getSuccessfulTaskCount() { Map total = CollectionUtils.mapValues(totalSuccessfulTaskCount, AtomicLong::get); - Map delta = getDeltaValues(total, prevTotalSuccessfulTaskCount); - prevTotalSuccessfulTaskCount = total; - return delta; + synchronized (totalSuccessfulTaskCount) { + Map delta = getDeltaValues(total, prevTotalSuccessfulTaskCount); + prevTotalSuccessfulTaskCount = total; + return delta; + } } public Map getFailedTaskCount() { Map total = CollectionUtils.mapValues(totalFailedTaskCount, AtomicLong::get); - Map delta = getDeltaValues(total, prevTotalFailedTaskCount); - prevTotalFailedTaskCount = total; - return delta; + synchronized (totalFailedTaskCount) { + Map delta = getDeltaValues(total, prevTotalFailedTaskCount); + prevTotalFailedTaskCount = total; + return delta; + } + } + + Map getCurrentTaskDatasources() + { + giant.lock(); + try { + return tasks.stream().collect(Collectors.toMap(Task::getId, Task::getDataSource)); + } + finally { + giant.unlock(); + } } public Map getRunningTaskCount() { - Map taskDatasources = tasks.stream().collect(Collectors.toMap(Task::getId, Task::getDataSource)); + Map taskDatasources = getCurrentTaskDatasources(); return taskRunner.getRunningTasks() .stream() .collect(Collectors.toMap( @@ -711,7 +734,7 @@ public Map getRunningTaskCount() public Map getPendingTaskCount() { - Map taskDatasources = tasks.stream().collect(Collectors.toMap(Task::getId, Task::getDataSource)); + Map taskDatasources = getCurrentTaskDatasources(); return taskRunner.getPendingTasks() .stream() .collect(Collectors.toMap( @@ -727,13 +750,26 @@ public Map getWaitingTaskCount() .stream() .map(TaskRunnerWorkItem::getTaskId) .collect(Collectors.toSet()); - return tasks.stream().filter(task -> !runnerKnownTaskIds.contains(task.getId())) - .collect(Collectors.toMap(Task::getDataSource, task -> 1L, Long::sum)); + + giant.lock(); + try { + return tasks.stream().filter(task -> !runnerKnownTaskIds.contains(task.getId())) + .collect(Collectors.toMap(Task::getDataSource, task -> 1L, Long::sum)); + } + finally { + giant.unlock(); + } } @VisibleForTesting List getTasks() { - return tasks; + giant.lock(); + try { + return new ArrayList(tasks); + } + finally { + giant.unlock(); + } } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java index dc5fc0e1c6a1..81154af5f969 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java @@ -75,6 +75,7 @@ public class TaskQueueTest extends IngestionTestBase * APIs. */ @Test + @SuppressWarnings("GuardedBy") public void testManageInternalReleaseLockWhenTaskIsNotReady() throws Exception { final TaskActionClientFactory actionClientFactory = createActionClientFactory(); @@ -315,6 +316,7 @@ public void testUserProvidedContextOverrideLockConfig() throws EntryExistsExcept } @Test + @SuppressWarnings("GuardedBy") public void testTaskStatusWhenExceptionIsThrownInIsReady() throws EntryExistsException { final TaskActionClientFactory actionClientFactory = createActionClientFactory(); From db28dd8c0887800be9a5855be670f80224350481 Mon Sep 17 00:00:00 2001 From: Jason Koch Date: Mon, 27 Dec 2021 10:45:56 -0800 Subject: [PATCH 2/7] perf: Introduce TaskQueueScaleTest to test performance of TaskQueue with large task counts This introduces a test case to confirm how long it will take to launch and manage (aka shutdown) a large number of threads in the TaskQueue. h/t to @gianm for main implementation. --- .../indexing/overlord/TaskQueueScaleTest.java | 475 ++++++++++++++++++ 1 file changed, 475 insertions(+) create mode 100644 indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java new file mode 100644 index 000000000000..fd8c78d12937 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java @@ -0,0 +1,475 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import org.apache.druid.indexer.RunnerTaskState; +import org.apache.druid.indexer.TaskLocation; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.common.actions.TaskAction; +import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.indexing.common.actions.TaskActionClientFactory; +import org.apache.druid.indexing.common.config.TaskStorageConfig; +import org.apache.druid.indexing.common.task.NoopTask; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.autoscaling.ScalingStats; +import org.apache.druid.indexing.overlord.config.DefaultTaskConfig; +import org.apache.druid.indexing.overlord.config.TaskLockConfig; +import org.apache.druid.indexing.overlord.config.TaskQueueConfig; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator; +import org.apache.druid.metadata.TestDerbyConnector; +import org.apache.druid.segment.TestHelper; +import org.apache.druid.server.metrics.NoopServiceEmitter; +import org.joda.time.Duration; +import org.joda.time.Period; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * Tests that {@link TaskQueue} is able to handle large numbers of concurrently-running tasks. + */ +public class TaskQueueScaleTest +{ + private static final String DATASOURCE = "ds"; + + private final int numTasks = 1000; + + @Rule + public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule(); + + private TaskQueue taskQueue; + private TaskStorage taskStorage; + private TestTaskRunner taskRunner; + private Closer closer; + + @Before + public void setUp() throws Exception + { + EmittingLogger.registerEmitter(new NoopServiceEmitter()); + + closer = Closer.create(); + + // Be as realistic as possible; use actual classes for storage rather than mocks. + taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(Period.hours(1))); + taskRunner = new TestTaskRunner(); + closer.register(taskRunner::stop); + final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); + + final IndexerSQLMetadataStorageCoordinator storageCoordinator = new IndexerSQLMetadataStorageCoordinator( + jsonMapper, + derbyConnectorRule.metadataTablesConfigSupplier().get(), + derbyConnectorRule.getConnector() + ); + + final TaskActionClientFactory unsupportedTaskActionFactory = + task -> new TaskActionClient() + { + @Override + public RetType submit(TaskAction taskAction) + { + throw new UnsupportedOperationException(); + } + }; + + taskQueue = new TaskQueue( + new TaskLockConfig(), + new TaskQueueConfig(null, Period.millis(1), null, null), + new DefaultTaskConfig(), + taskStorage, + taskRunner, + unsupportedTaskActionFactory, // Not used for anything serious + new TaskLockbox(taskStorage, storageCoordinator), + new NoopServiceEmitter() + ); + + taskQueue.start(); + closer.register(taskQueue::stop); + } + + @After + public void tearDown() throws Exception + { + closer.close(); + } + + @Test(timeout = 60_000L) // more than enough time if the task queue is efficient + public void doMassLaunchAndExit() throws Exception + { + Assert.assertEquals("no tasks should be running", 0, taskRunner.getKnownTasks().size()); + + // Add all tasks. + for (int i = 0; i < numTasks; i++) { + final TestTask testTask = new TestTask(i, 2000L /* runtime millis */); + taskQueue.add(testTask); + } + + // Wait for all tasks to finish. + while (taskStorage.getRecentlyCreatedAlreadyFinishedTaskInfo( + numTasks, + Duration.standardHours(1), + DATASOURCE).size() < numTasks) { + Thread.sleep(100); + } + + Assert.assertEquals("no tasks should be running", 0, taskStorage.getActiveTasks().size()); + } + + @Test(timeout = 60_000L) // more than enough time if the task queue is efficient + public void doMassLaunchAndShutdown() throws Exception + { + Assert.assertEquals("no tasks should be running", 0, taskRunner.getKnownTasks().size()); + + // Add all tasks. + final List taskIds = new ArrayList<>(); + for (int i = 0; i < numTasks; i++) { + final TestTask testTask = new TestTask( + i, + Duration.standardHours(1).getMillis() /* very long runtime millis, so we can do a shutdown */ + ); + taskQueue.add(testTask); + taskIds.add(testTask.getId()); + } + + // wait for all tasks to progress to running state + while (taskStorage.getActiveTasks().size() < numTasks) { + Thread.sleep(100); + } + Assert.assertEquals("all tasks should be running", numTasks, taskStorage.getActiveTasks().size()); + + // Shut down all tasks. + for (final String taskId : taskIds) { + taskQueue.shutdown(taskId, "test shutdown"); + } + + // Wait for all tasks to finish. + while (!taskStorage.getActiveTasks().isEmpty()) { + Thread.sleep(100); + } + + Assert.assertEquals("no tasks should be running", 0, taskStorage.getActiveTasks().size()); + + int completed = taskStorage.getRecentlyCreatedAlreadyFinishedTaskInfo( + numTasks, + Duration.standardHours(1), + DATASOURCE).size(); + Assert.assertEquals("all tasks should have completed", numTasks, completed); + } + + private static class TestTask extends NoopTask + { + private final int number; + private final long runtime; + + public TestTask(int number, long runtime) + { + super(null, null, DATASOURCE, 0, 0, null, null, Collections.emptyMap()); + this.number = number; + this.runtime = runtime; + } + + public int getNumber() + { + return number; + } + + public long getRuntimeMillis() + { + return runtime; + } + } + + private static class TestTaskRunner implements TaskRunner + { + private static final Logger log = new Logger(TestTaskRunner.class); + private static final Duration T_PENDING_TO_RUNNING = Duration.standardSeconds(2); + private static final Duration T_SHUTDOWN_ACK = Duration.millis(8); + private static final Duration T_SHUTDOWN_COMPLETE = Duration.standardSeconds(2); + + @GuardedBy("knownTasks") + private final Map knownTasks = new HashMap<>(); + + private final ScheduledExecutorService exec = ScheduledExecutors.fixed(8, "TaskQueueScaleTest-%s"); + + @Override + public void start() + { + throw new UnsupportedOperationException(); + } + + @Override + public ListenableFuture run(Task task) + { + // Production task runners generally do not take a long time to execute "run", but may take a long time to + // go from "running" to "pending". + synchronized (knownTasks) { + final TestTaskRunnerWorkItem item = knownTasks.computeIfAbsent(task.getId(), TestTaskRunnerWorkItem::new); + exec.schedule( + () -> { + try { + synchronized (knownTasks) { + final TestTaskRunnerWorkItem item2 = knownTasks.get(task.getId()); + if (item2.getState() == RunnerTaskState.PENDING) { + knownTasks.put(task.getId(), item2.withState(RunnerTaskState.RUNNING)); + } + } + + exec.schedule( + () -> { + try { + final TestTaskRunnerWorkItem item2; + synchronized (knownTasks) { + item2 = knownTasks.get(task.getId()); + } + if (item2 != null) { + item2.setResult(TaskStatus.success(task.getId())); + } + } + catch (Throwable e) { + log.error(e, "Error in scheduled executor"); + } + }, + ((TestTask) task).getRuntimeMillis(), + TimeUnit.MILLISECONDS + ); + } + catch (Throwable e) { + log.error(e, "Error in scheduled executor"); + } + }, + T_PENDING_TO_RUNNING.getMillis(), + TimeUnit.MILLISECONDS + ); + + return item.getResult(); + } + } + + @Override + public void shutdown(String taskid, String reason) + { + // Production task runners take a long time to execute "shutdown" if the task is currently running. + synchronized (knownTasks) { + if (!knownTasks.containsKey(taskid)) { + return; + } + } + + threadSleep(T_SHUTDOWN_ACK); + + final TestTaskRunnerWorkItem existingTask; + synchronized (knownTasks) { + existingTask = knownTasks.get(taskid); + } + if (!existingTask.getResult().isDone()) { + exec.schedule(() -> { + existingTask.setResult(TaskStatus.failure("taskId", "stopped")); + synchronized (knownTasks) { + knownTasks.remove(taskid); + } + }, T_SHUTDOWN_COMPLETE.getMillis(), TimeUnit.MILLISECONDS); + } + } + + static void threadSleep(Duration duration) + { + try { + Thread.sleep(duration.getMillis()); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + + @Override + public void registerListener(TaskRunnerListener listener, Executor executor) + { + throw new UnsupportedOperationException(); + } + + @Override + public void unregisterListener(String listenerId) + { + throw new UnsupportedOperationException(); + } + + @Override + public List>> restore() + { + // Do nothing, and return null. (TaskQueue doesn't use the return value.) + return null; + } + + @Override + public void stop() + { + exec.shutdownNow(); + } + + @Override + public Collection getRunningTasks() + { + synchronized (knownTasks) { + return knownTasks.values() + .stream() + .filter(item -> item.getState() == RunnerTaskState.RUNNING) + .collect(Collectors.toList()); + } + } + + @Override + public Collection getPendingTasks() + { + synchronized (knownTasks) { + return knownTasks.values() + .stream() + .filter(item -> item.getState() == RunnerTaskState.PENDING) + .collect(Collectors.toList()); + } + } + + @Override + public Collection getKnownTasks() + { + synchronized (knownTasks) { + return ImmutableList.copyOf(knownTasks.values()); + } + } + + @Override + public Optional getScalingStats() + { + throw new UnsupportedOperationException(); + } + + @Override + public Map getTotalTaskSlotCount() + { + throw new UnsupportedOperationException(); + } + + @Override + public Map getIdleTaskSlotCount() + { + throw new UnsupportedOperationException(); + } + + @Override + public Map getUsedTaskSlotCount() + { + throw new UnsupportedOperationException(); + } + + @Override + public Map getLazyTaskSlotCount() + { + throw new UnsupportedOperationException(); + } + + @Override + public Map getBlacklistedTaskSlotCount() + { + throw new UnsupportedOperationException(); + } + } + + private static class TestTaskRunnerWorkItem extends TaskRunnerWorkItem + { + private final RunnerTaskState state; + + public TestTaskRunnerWorkItem(final String taskId) + { + this(taskId, SettableFuture.create(), RunnerTaskState.PENDING); + } + + private TestTaskRunnerWorkItem( + final String taskId, + final ListenableFuture result, + final RunnerTaskState state + ) + { + super(taskId, result); + this.state = state; + } + + public RunnerTaskState getState() + { + return state; + } + + @Override + public TaskLocation getLocation() + { + return TaskLocation.unknown(); + } + + @Nullable + @Override + public String getTaskType() + { + throw new UnsupportedOperationException(); + } + + @Override + public String getDataSource() + { + throw new UnsupportedOperationException(); + } + + public void setResult(final TaskStatus result) + { + ((SettableFuture) getResult()).set(result); + + // possibly a parallel shutdown request was issued during the + // shutdown time; ignore it + } + + public TestTaskRunnerWorkItem withState(final RunnerTaskState newState) + { + return new TestTaskRunnerWorkItem(getTaskId(), getResult(), newState); + } + } +} + From ef94f4fd8a5204ddd81cfb2465055604e2ba6ea8 Mon Sep 17 00:00:00 2001 From: Jason Koch Date: Mon, 27 Dec 2021 13:20:34 -0800 Subject: [PATCH 3/7] perf: improve scalability of TaskQueue with large task counts --- .../druid/indexing/overlord/TaskQueue.java | 211 ++++++++++++------ .../indexing/overlord/TaskQueueTest.java | 2 - 2 files changed, 146 insertions(+), 67 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index bb23b016a426..6d122796f4ef 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -54,9 +54,12 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -64,7 +67,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; @@ -80,6 +82,7 @@ public class TaskQueue { private final long MANAGEMENT_WAIT_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(60); + private final long MIN_WAIT_TIME_MS = 100; @GuardedBy("giant") private final List tasks = new ArrayList<>(); @@ -96,7 +99,7 @@ public class TaskQueue private final ServiceEmitter emitter; private final ReentrantLock giant = new ReentrantLock(true); - private final Condition managementMayBeNecessary = giant.newCondition(); + private final BlockingQueue managementMayBeNecessary = new ArrayBlockingQueue<>(8); private final ExecutorService managerExec = Executors.newSingleThreadExecutor( new ThreadFactoryBuilder() .setDaemon(false) @@ -212,7 +215,7 @@ public ScheduledExecutors.Signal call() } } ); - managementMayBeNecessary.signalAll(); + requestManagement(); } finally { giant.unlock(); @@ -233,7 +236,7 @@ public void stop() active = false; managerExec.shutdownNow(); storageSyncExec.shutdownNow(); - managementMayBeNecessary.signalAll(); + requestManagement(); } finally { giant.unlock(); @@ -245,6 +248,51 @@ public boolean isActive() return active; } + /** + * Request management from the management thread. Non-blocking. + * + * Other callers (such as notifyStatus) should trigger activity on the + * TaskQueue thread by requesting management here. + */ + void requestManagement() + { + // use a BlockingQueue since the offer/poll/wait behaviour is simple + // and very easy to reason about + + // the request has to be offer (non blocking), since someone might request + // while already holding giant lock + + // do not care if the item fits into the queue: + // if the queue is already full, request has been triggered anyway + managementMayBeNecessary.offer(this); + } + + /** + * Await for an event to manage. + * + * This should only be called from the management thread to wait for activity. + * + * @param nanos + * @throws InterruptedException + */ + void awaitManagementNanos(long nanos) throws InterruptedException + { + // mitigate a busy loop, it can get pretty busy when there are a lot of start/stops + try { + Thread.sleep(MIN_WAIT_TIME_MS); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + + // wait for an item, if an item arrives (or is already available), complete immediately + // (does not actually matter what the item is) + managementMayBeNecessary.poll(nanos - (TimeUnit.MILLISECONDS.toNanos(MIN_WAIT_TIME_MS)), TimeUnit.NANOSECONDS); + + // there may have been multiple requests, clear them all + managementMayBeNecessary.clear(); + } + /** * Main task runner management loop. Meant to run forever, or, at least until we're stopped. */ @@ -257,32 +305,54 @@ private void manage() throws InterruptedException taskRunner.restore(); while (active) { - giant.lock(); + manageInternal(); - try { - manageInternal(); - // awaitNanos because management may become necessary without this condition signalling, - // due to e.g. tasks becoming ready when other folks mess with the TaskLockbox. - managementMayBeNecessary.awaitNanos(MANAGEMENT_WAIT_TIMEOUT_NANOS); - } - finally { - giant.unlock(); - } + // awaitNanos because management may become necessary without this condition signalling, + // due to e.g. tasks becoming ready when other folks mess with the TaskLockbox. + awaitManagementNanos(MANAGEMENT_WAIT_TIMEOUT_NANOS); } } @VisibleForTesting - @GuardedBy("giant") void manageInternal() + { + Set knownTaskIds = new HashSet<>(); + Map> runnerTaskFutures = new HashMap<>(); + + giant.lock(); + + try { + manageInternalCritical(knownTaskIds, runnerTaskFutures); + } + finally { + giant.unlock(); + } + + manageInternalPostCritical(knownTaskIds, runnerTaskFutures); + } + + + /** + * Management loop critical section tasks. + * + * @param knownTaskIds will be modified - filled with known task IDs + * @param runnerTaskFutures will be modified - filled with futures related to getting the running tasks + */ + @GuardedBy("giant") + private void manageInternalCritical( + final Set knownTaskIds, + final Map> runnerTaskFutures + ) { // Task futures available from the taskRunner - final Map> runnerTaskFutures = new HashMap<>(); for (final TaskRunnerWorkItem workItem : taskRunner.getKnownTasks()) { runnerTaskFutures.put(workItem.getTaskId(), workItem.getResult()); } // Attain futures for all active tasks (assuming they are ready to run). // Copy tasks list, as notifyStatus may modify it. for (final Task task : ImmutableList.copyOf(tasks)) { + knownTaskIds.add(task.getId()); + if (!taskFutures.containsKey(task.getId())) { final ListenableFuture runnerTaskFuture; if (runnerTaskFutures.containsKey(task.getId())) { @@ -323,11 +393,15 @@ void manageInternal() taskRunner.run(task); } } + } + + @VisibleForTesting + private void manageInternalPostCritical( + final Set knownTaskIds, + final Map> runnerTaskFutures + ) + { // Kill tasks that shouldn't be running - final Set knownTaskIds = tasks - .stream() - .map(Task::getId) - .collect(Collectors.toSet()); final Set tasksToKill = Sets.difference(runnerTaskFutures.keySet(), knownTaskIds); if (!tasksToKill.isEmpty()) { log.info("Asking taskRunner to clean up %,d tasks.", tasksToKill.size()); @@ -389,7 +463,7 @@ public boolean add(final Task task) throws EntryExistsException // insert the task into our queue. So don't catch it. taskStorage.insert(task, TaskStatus.running(task.getId())); addTaskInternal(task); - managementMayBeNecessary.signalAll(); + requestManagement(); return true; } finally { @@ -477,30 +551,34 @@ public void shutdownWithSuccess(final String taskId, String reasonFormat, Object */ private void notifyStatus(final Task task, final TaskStatus taskStatus, String reasonFormat, Object... args) { - giant.lock(); + Preconditions.checkNotNull(task, "task"); + Preconditions.checkNotNull(taskStatus, "status"); + Preconditions.checkState(active, "Queue is not active!"); + Preconditions.checkArgument( + task.getId().equals(taskStatus.getId()), + "Mismatching task ids[%s/%s]", + task.getId(), + taskStatus.getId() + ); + + // Inform taskRunner that this task can be shut down TaskLocation taskLocation = TaskLocation.unknown(); + try { + taskLocation = taskRunner.getTaskLocation(task.getId()); + taskRunner.shutdown(task.getId(), reasonFormat, args); + } + catch (Exception e) { + log.warn(e, "TaskRunner failed to cleanup task after completion: %s", task.getId()); + } + + int removed = 0; + ///////// critical section + + giant.lock(); try { - Preconditions.checkNotNull(task, "task"); - Preconditions.checkNotNull(taskStatus, "status"); - Preconditions.checkState(active, "Queue is not active!"); - Preconditions.checkArgument( - task.getId().equals(taskStatus.getId()), - "Mismatching task ids[%s/%s]", - task.getId(), - taskStatus.getId() - ); - // Inform taskRunner that this task can be shut down - try { - taskLocation = taskRunner.getTaskLocation(task.getId()); - taskRunner.shutdown(task.getId(), reasonFormat, args); - } - catch (Exception e) { - log.warn(e, "TaskRunner failed to cleanup task after completion: %s", task.getId()); - } // Remove from running tasks - int removed = 0; for (int i = tasks.size() - 1; i >= 0; i--) { if (tasks.get(i).getId().equals(task.getId())) { removed++; @@ -508,36 +586,39 @@ private void notifyStatus(final Task task, final TaskStatus taskStatus, String r break; } } - if (removed == 0) { - log.warn("Unknown task completed: %s", task.getId()); - } else if (removed > 1) { - log.makeAlert("Removed multiple copies of task").addData("count", removed).addData("task", task.getId()).emit(); - } + // Remove from futures list taskFutures.remove(task.getId()); - if (removed > 0) { - // If we thought this task should be running, save status to DB - try { - final Optional previousStatus = taskStorage.getStatus(task.getId()); - if (!previousStatus.isPresent() || !previousStatus.get().isRunnable()) { - log.makeAlert("Ignoring notification for already-complete task").addData("task", task.getId()).emit(); - } else { - taskStorage.setStatus(taskStatus.withLocation(taskLocation)); - log.info("Task done: %s", task); - managementMayBeNecessary.signalAll(); - } - } - catch (Exception e) { - log.makeAlert(e, "Failed to persist status for task") - .addData("task", task.getId()) - .addData("statusCode", taskStatus.getStatusCode()) - .emit(); - } - } } finally { giant.unlock(); } + + ///////// end critical + + if (removed == 0) { + log.warn("Unknown task completed: %s", task.getId()); + } + + if (removed > 0) { + // If we thought this task should be running, save status to DB + try { + final Optional previousStatus = taskStorage.getStatus(task.getId()); + if (!previousStatus.isPresent() || !previousStatus.get().isRunnable()) { + log.makeAlert("Ignoring notification for already-complete task").addData("task", task.getId()).emit(); + } else { + taskStorage.setStatus(taskStatus.withLocation(taskLocation)); + log.info("Task done: %s", task); + requestManagement(); + } + } + catch (Exception e) { + log.makeAlert(e, "Failed to persist status for task") + .addData("task", task.getId()) + .addData("statusCode", taskStatus.getStatusCode()) + .emit(); + } + } } /** @@ -659,7 +740,7 @@ private void syncFromStorage() addedTasks.size(), removedTasks.size() ); - managementMayBeNecessary.signalAll(); + requestManagement(); } else { log.info("Not active. Skipping storage sync."); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java index 81154af5f969..dc5fc0e1c6a1 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java @@ -75,7 +75,6 @@ public class TaskQueueTest extends IngestionTestBase * APIs. */ @Test - @SuppressWarnings("GuardedBy") public void testManageInternalReleaseLockWhenTaskIsNotReady() throws Exception { final TaskActionClientFactory actionClientFactory = createActionClientFactory(); @@ -316,7 +315,6 @@ public void testUserProvidedContextOverrideLockConfig() throws EntryExistsExcept } @Test - @SuppressWarnings("GuardedBy") public void testTaskStatusWhenExceptionIsThrownInIsReady() throws EntryExistsException { final TaskActionClientFactory actionClientFactory = createActionClientFactory(); From 22f633bfe6989827049795c0060d9c3c64be22fa Mon Sep 17 00:00:00 2001 From: Jason Koch Date: Wed, 5 Jan 2022 17:03:12 -0800 Subject: [PATCH 4/7] linter fixes, expand test coverage --- .../druid/indexing/overlord/TaskQueue.java | 4 +++- .../indexing/overlord/TaskQueueScaleTest.java | 22 +++++++++++++++++-- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index 6d122796f4ef..684f1fc8ffd0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -29,6 +29,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.errorprone.annotations.concurrent.GuardedBy; +import org.apache.druid.annotations.SuppressFBWarnings; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.Counters; @@ -99,6 +100,7 @@ public class TaskQueue private final ServiceEmitter emitter; private final ReentrantLock giant = new ReentrantLock(true); + //noinspection MismatchedCollectionQueryUpdate private final BlockingQueue managementMayBeNecessary = new ArrayBlockingQueue<>(8); private final ExecutorService managerExec = Executors.newSingleThreadExecutor( new ThreadFactoryBuilder() @@ -275,6 +277,7 @@ void requestManagement() * @param nanos * @throws InterruptedException */ + @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification = "using queue as notification mechanism, result has no value") void awaitManagementNanos(long nanos) throws InterruptedException { // mitigate a busy loop, it can get pretty busy when there are a lot of start/stops @@ -561,7 +564,6 @@ private void notifyStatus(final Task task, final TaskStatus taskStatus, String r taskStatus.getId() ); - // Inform taskRunner that this task can be shut down TaskLocation taskLocation = TaskLocation.unknown(); try { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java index fd8c78d12937..3f256a0a09f9 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java @@ -85,7 +85,7 @@ public class TaskQueueScaleTest private Closer closer; @Before - public void setUp() throws Exception + public void setUp() { EmittingLogger.registerEmitter(new NoopServiceEmitter()); @@ -138,6 +138,8 @@ public void tearDown() throws Exception public void doMassLaunchAndExit() throws Exception { Assert.assertEquals("no tasks should be running", 0, taskRunner.getKnownTasks().size()); + Assert.assertEquals("no tasks should be known", 0, taskQueue.getTasks().size()); + Assert.assertEquals("no tasks should be running", 0, taskQueue.getRunningTaskCount().size()); // Add all tasks. for (int i = 0; i < numTasks; i++) { @@ -145,6 +147,13 @@ public void doMassLaunchAndExit() throws Exception taskQueue.add(testTask); } + // in theory we can get a race here, since we fetch the counts at separate times + Assert.assertEquals("all tasks should be known", numTasks, taskQueue.getTasks().size()); + long runningTasks = taskQueue.getRunningTaskCount().values().stream().mapToLong(Long::longValue).sum(); + long pendingTasks = taskQueue.getPendingTaskCount().values().stream().mapToLong(Long::longValue).sum(); + long waitingTasks = taskQueue.getWaitingTaskCount().values().stream().mapToLong(Long::longValue).sum(); + Assert.assertEquals("all tasks should be known", numTasks, (runningTasks + pendingTasks + waitingTasks)); + // Wait for all tasks to finish. while (taskStorage.getRecentlyCreatedAlreadyFinishedTaskInfo( numTasks, @@ -153,7 +162,15 @@ public void doMassLaunchAndExit() throws Exception Thread.sleep(100); } - Assert.assertEquals("no tasks should be running", 0, taskStorage.getActiveTasks().size()); + Thread.sleep(100); + + Assert.assertEquals("no tasks should be active", 0, taskStorage.getActiveTasks().size()); + runningTasks = taskQueue.getRunningTaskCount().values().stream().mapToLong(Long::longValue).sum(); + pendingTasks = taskQueue.getPendingTaskCount().values().stream().mapToLong(Long::longValue).sum(); + waitingTasks = taskQueue.getWaitingTaskCount().values().stream().mapToLong(Long::longValue).sum(); + Assert.assertEquals("no tasks should be running", 0, runningTasks); + Assert.assertEquals("no tasks should be pending", 0, pendingTasks); + Assert.assertEquals("no tasks should be waiting", 0, waitingTasks); } @Test(timeout = 60_000L) // more than enough time if the task queue is efficient @@ -261,6 +278,7 @@ public ListenableFuture run(Task task) final TestTaskRunnerWorkItem item2; synchronized (knownTasks) { item2 = knownTasks.get(task.getId()); + knownTasks.put(task.getId(), item2.withState(RunnerTaskState.NONE)); } if (item2 != null) { item2.setResult(TaskStatus.success(task.getId())); From 823664d561c1f20641d435fd233479fe757523bf Mon Sep 17 00:00:00 2001 From: Jason Koch Date: Thu, 6 Jan 2022 09:27:15 -0800 Subject: [PATCH 5/7] pr feedback suggestion; swap to different linter --- .../main/java/org/apache/druid/indexing/overlord/TaskQueue.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index 684f1fc8ffd0..d5179c75bec6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -100,7 +100,7 @@ public class TaskQueue private final ServiceEmitter emitter; private final ReentrantLock giant = new ReentrantLock(true); - //noinspection MismatchedCollectionQueryUpdate + //noinspection MismatchedQueryAndUpdateOfCollection private final BlockingQueue managementMayBeNecessary = new ArrayBlockingQueue<>(8); private final ExecutorService managerExec = Executors.newSingleThreadExecutor( new ThreadFactoryBuilder() From 1ee151d817b0aaf29e88e9aa7334412209caace7 Mon Sep 17 00:00:00 2001 From: Jason Koch Date: Thu, 6 Jan 2022 09:41:53 -0800 Subject: [PATCH 6/7] swap to use SuppressWarnings --- .../main/java/org/apache/druid/indexing/overlord/TaskQueue.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index d5179c75bec6..4c4636a8daae 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -100,7 +100,7 @@ public class TaskQueue private final ServiceEmitter emitter; private final ReentrantLock giant = new ReentrantLock(true); - //noinspection MismatchedQueryAndUpdateOfCollection + @SuppressWarnings("MismatchedQueryAndUpdateOfCollection") private final BlockingQueue managementMayBeNecessary = new ArrayBlockingQueue<>(8); private final ExecutorService managerExec = Executors.newSingleThreadExecutor( new ThreadFactoryBuilder() From 17808ff1bf5c20f2b574e15e4be99c701e363db5 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 21 Apr 2022 17:49:55 -0700 Subject: [PATCH 7/7] Fix TaskQueueScaleTest. --- .../indexing/overlord/TaskQueueScaleTest.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java index 3f256a0a09f9..d305b0d6c9b2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java @@ -44,6 +44,7 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator; +import org.apache.druid.metadata.TaskLookup; import org.apache.druid.metadata.TestDerbyConnector; import org.apache.druid.segment.TestHelper; import org.apache.druid.server.metrics.NoopServiceEmitter; @@ -155,10 +156,10 @@ public void doMassLaunchAndExit() throws Exception Assert.assertEquals("all tasks should be known", numTasks, (runningTasks + pendingTasks + waitingTasks)); // Wait for all tasks to finish. - while (taskStorage.getRecentlyCreatedAlreadyFinishedTaskInfo( - numTasks, - Duration.standardHours(1), - DATASOURCE).size() < numTasks) { + final TaskLookup.CompleteTaskLookup completeTaskLookup = + TaskLookup.CompleteTaskLookup.of(numTasks, Duration.standardHours(1)); + + while (taskStorage.getTaskInfos(completeTaskLookup, DATASOURCE).size() < numTasks) { Thread.sleep(100); } @@ -207,10 +208,10 @@ public void doMassLaunchAndShutdown() throws Exception Assert.assertEquals("no tasks should be running", 0, taskStorage.getActiveTasks().size()); - int completed = taskStorage.getRecentlyCreatedAlreadyFinishedTaskInfo( - numTasks, - Duration.standardHours(1), - DATASOURCE).size(); + int completed = taskStorage.getTaskInfos( + TaskLookup.CompleteTaskLookup.of(numTasks, Duration.standardHours(1)), + DATASOURCE + ).size(); Assert.assertEquals("all tasks should have completed", numTasks, completed); }