From 9d309c4b83c878560bcba1fda3d79b566c0a15f7 Mon Sep 17 00:00:00 2001 From: Amatya Date: Mon, 3 Oct 2022 17:14:49 +0530 Subject: [PATCH 1/8] Fix Overlord leader election when task lock re-acquisition fails --- .../druid/indexing/overlord/TaskLockbox.java | 27 ++++++++-- .../indexing/overlord/TaskLockboxTest.java | 52 +++++++++++++++++++ 2 files changed, 74 insertions(+), 5 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index a53af645913b..3665e1869fc4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -29,6 +29,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Ordering; import com.google.inject.Inject; +import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.SegmentLock; import org.apache.druid.indexing.common.TaskLock; @@ -139,6 +140,7 @@ public int compare(Pair left, Pair right) running.clear(); activeTasks.clear(); activeTasks.addAll(storedActiveTasks); + Set failedToReacquireLockTasks = new HashSet<>(); // Bookkeeping for a log message at the end int taskLockCount = 0; for (final Pair taskAndLock : byVersionOrdering.sortedCopy(storedLocks)) { @@ -150,6 +152,11 @@ public int compare(Pair left, Pair right) continue; } + if (failedToReacquireLockTasks.contains(task)) { + log.info("Ignoring task[%s] as it failed to acquire at least one of its locks", task.getId()); + continue; + } + // Create a new taskLock if it doesn't have a proper priority, // so that every taskLock in memory has the priority. final TaskLock savedTaskLockWithPriority = savedTaskLock.getPriority() == null @@ -183,8 +190,9 @@ public int compare(Pair left, Pair right) ); } } else { - throw new ISE( - "Could not reacquire lock on interval[%s] version[%s] for task: %s", + failedToReacquireLockTasks.add(task); + log.error( + "Could not reacquire lock on interval[%s] version[%s] for task: %s.", savedTaskLockWithPriority.getInterval(), savedTaskLockWithPriority.getVersion(), task.getId() @@ -197,17 +205,26 @@ public int compare(Pair left, Pair right) activeTasks.size(), storedLocks.size() - taskLockCount ); + + for (Task task : failedToReacquireLockTasks) { + for (TaskLock lock : taskStorage.getLocks(task.getId())) { + taskStorage.removeLock(task.getId(), lock); + } + taskStorage.setStatus(TaskStatus.failure(task.getId(), "Failed to reacquire lock")); + } } + finally { - giant.unlock(); - } + giant.unlock(); + } } /** * This method is called only in {@link #syncFromStorage()} and verifies the given task and the taskLock have the same * groupId, dataSource, and priority. */ - private TaskLockPosse verifyAndCreateOrFindLockPosse(Task task, TaskLock taskLock) + @VisibleForTesting + protected TaskLockPosse verifyAndCreateOrFindLockPosse(Task task, TaskLock taskLock) { giant.lock(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java index 1da1ae18f84d..5e7f82a4417c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java @@ -1258,6 +1258,41 @@ public void testGetLockedIntervalsForRevokedLocks() throws Exception ); } + @Test + public void testFailedToReacquireTaskLock() throws Exception + { + final Task task = NoopTask.create(); + taskStorage.insert(task, TaskStatus.running(task.getId())); + + TaskLockbox testLockBox = new NullLockPosseTaskLockBox(taskStorage, metadataStorageCoordinator); + testLockBox.add(task); + testLockBox.tryLock(task, new TimeChunkLockRequest(TaskLockType.EXCLUSIVE, + task, + Intervals.of("2017-05-01/2017-06-01"), + null) + ); + testLockBox.tryLock(task, new TimeChunkLockRequest(TaskLockType.EXCLUSIVE, + task, + Intervals.of("2017-06-01/2017-07-01"), + null) + ); + testLockBox.tryLock(task, new TimeChunkLockRequest(TaskLockType.EXCLUSIVE, + task, + Intervals.of("2017-07-01/2017-08-01"), + null) + ); + Assert.assertEquals(1, taskStorage.getActiveTasks().size()); + Assert.assertEquals(3, taskStorage.getLocks(task.getId()).size()); + + // Should not be able to create TaskLockPosse + testLockBox.syncFromStorage(); + + // Task should no longer be active and lock must not exist + Assert.assertTrue(taskStorage.getStatus(task.getId()).get().isFailure()); + Assert.assertEquals(0, taskStorage.getActiveTasks().size()); + Assert.assertEquals(0, taskStorage.getLocks(task.getId()).size()); + } + private Set getAllLocks(List tasks) { return tasks.stream() @@ -1383,4 +1418,21 @@ public TaskStatus run(TaskToolbox toolbox) return TaskStatus.failure("how?", "Dummy task status err msg"); } } + + private static class NullLockPosseTaskLockBox extends TaskLockbox + { + public NullLockPosseTaskLockBox( + TaskStorage taskStorage, + IndexerMetadataStorageCoordinator metadataStorageCoordinator + ) + { + super(taskStorage, metadataStorageCoordinator); + } + + @Override + protected TaskLockPosse verifyAndCreateOrFindLockPosse(Task task, TaskLock taskLock) + { + return null; + } + } } From ba876b76386218a69ed3c876cce4091c2387b464 Mon Sep 17 00:00:00 2001 From: Amatya Date: Mon, 3 Oct 2022 19:46:29 +0530 Subject: [PATCH 2/8] Add SyncResult and process it better --- .../druid/indexing/overlord/SyncResult.java | 45 +++++++++++++++++++ .../druid/indexing/overlord/TaskLockbox.java | 15 ++++--- .../druid/indexing/overlord/TaskMaster.java | 22 ++++++++- .../indexing/overlord/TaskLockboxTest.java | 24 +++++----- 4 files changed, 88 insertions(+), 18 deletions(-) create mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/overlord/SyncResult.java diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SyncResult.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SyncResult.java new file mode 100644 index 000000000000..2df3dbc45142 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SyncResult.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord; + +import org.apache.druid.indexing.common.task.Task; + +import java.util.List; + +/** + * Result of TaskLockbox#syncFromStorage() + */ +class SyncResult +{ + private final List tasksToFail; + + SyncResult(List tasksToFail) + { + this.tasksToFail = tasksToFail; + } + + /** + * Return list of tasks whose status needs to be set to failed due to lock re-acquisition failure + */ + List getTasksToFail() + { + return tasksToFail; + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index 3665e1869fc4..2e6b18cd2660 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -29,7 +29,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Ordering; import com.google.inject.Inject; -import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.SegmentLock; import org.apache.druid.indexing.common.TaskLock; @@ -109,8 +108,10 @@ public TaskLockbox( /** * Wipe out our current in-memory state and resync it from our bundled {@link TaskStorage}. + * + * @return SyncResult which needs to be processed by the caller */ - public void syncFromStorage() + public SyncResult syncFromStorage() { giant.lock(); @@ -206,17 +207,21 @@ public int compare(Pair left, Pair right) storedLocks.size() - taskLockCount ); + // unlock all TaskLockPosse created, and also remove lock entries from storage for (Task task : failedToReacquireLockTasks) { + unlockAll(task); for (TaskLock lock : taskStorage.getLocks(task.getId())) { taskStorage.removeLock(task.getId(), lock); } - taskStorage.setStatus(TaskStatus.failure(task.getId(), "Failed to reacquire lock")); } + return new SyncResult( + ImmutableList.copyOf(failedToReacquireLockTasks) + ); } finally { - giant.unlock(); - } + giant.unlock(); + } } /** diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java index b52a3c5c03ef..7690c9202bd8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java @@ -26,6 +26,7 @@ import org.apache.druid.discovery.DruidLeaderSelector; import org.apache.druid.discovery.DruidLeaderSelector.Listener; import org.apache.druid.guice.annotations.Self; +import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.actions.TaskActionClientFactory; import org.apache.druid.indexing.common.task.Task; @@ -113,8 +114,9 @@ public void becomeLeader() log.info("By the power of Grayskull, I have the power!"); try { - taskLockbox.syncFromStorage(); + SyncResult syncResult = taskLockbox.syncFromStorage(); taskRunner = runnerFactory.build(); + processSyncResult(syncResult, taskStorage, taskRunner); taskQueue = new TaskQueue( taskLockConfig, taskQueueConfig, @@ -416,4 +418,22 @@ public Map getBlacklistedTaskSlotCount() return null; } } + + /** + * Process the results of synchronization from storage + * + * @param syncResult to be processed + * @param taskStorage stores tasks + * @param taskRunner runs tasks and holds state + */ + private void processSyncResult(SyncResult syncResult, TaskStorage taskStorage, TaskRunner taskRunner) + { + String taskReacquisitionFailure = "Failed to reacquire lock"; + for (Task task : syncResult.getTasksToFail()) { + // Shutdown task if it is running + taskRunner.shutdown(task.getId(), taskReacquisitionFailure); + // Mark as failed + taskStorage.setStatus(TaskStatus.failure(task.getId(), taskReacquisitionFailure)); + } + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java index 5e7f82a4417c..b14d2080a1f8 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java @@ -1264,19 +1264,19 @@ public void testFailedToReacquireTaskLock() throws Exception final Task task = NoopTask.create(); taskStorage.insert(task, TaskStatus.running(task.getId())); - TaskLockbox testLockBox = new NullLockPosseTaskLockBox(taskStorage, metadataStorageCoordinator); - testLockBox.add(task); - testLockBox.tryLock(task, new TimeChunkLockRequest(TaskLockType.EXCLUSIVE, + TaskLockbox testLockbox = new NullLockPosseTaskLockbox(taskStorage, metadataStorageCoordinator); + testLockbox.add(task); + testLockbox.tryLock(task, new TimeChunkLockRequest(TaskLockType.EXCLUSIVE, task, Intervals.of("2017-05-01/2017-06-01"), null) ); - testLockBox.tryLock(task, new TimeChunkLockRequest(TaskLockType.EXCLUSIVE, + testLockbox.tryLock(task, new TimeChunkLockRequest(TaskLockType.EXCLUSIVE, task, Intervals.of("2017-06-01/2017-07-01"), null) ); - testLockBox.tryLock(task, new TimeChunkLockRequest(TaskLockType.EXCLUSIVE, + testLockbox.tryLock(task, new TimeChunkLockRequest(TaskLockType.EXCLUSIVE, task, Intervals.of("2017-07-01/2017-08-01"), null) @@ -1284,12 +1284,12 @@ public void testFailedToReacquireTaskLock() throws Exception Assert.assertEquals(1, taskStorage.getActiveTasks().size()); Assert.assertEquals(3, taskStorage.getLocks(task.getId()).size()); - // Should not be able to create TaskLockPosse - testLockBox.syncFromStorage(); + // The task must be marked for failure + SyncResult result = testLockbox.syncFromStorage(); + Assert.assertEquals(1, result.getTasksToFail().size()); + Assert.assertEquals(task, result.getTasksToFail().get(0)); - // Task should no longer be active and lock must not exist - Assert.assertTrue(taskStorage.getStatus(task.getId()).get().isFailure()); - Assert.assertEquals(0, taskStorage.getActiveTasks().size()); + // Task must no longer have active locks Assert.assertEquals(0, taskStorage.getLocks(task.getId()).size()); } @@ -1419,9 +1419,9 @@ public TaskStatus run(TaskToolbox toolbox) } } - private static class NullLockPosseTaskLockBox extends TaskLockbox + private static class NullLockPosseTaskLockbox extends TaskLockbox { - public NullLockPosseTaskLockBox( + public NullLockPosseTaskLockbox( TaskStorage taskStorage, IndexerMetadataStorageCoordinator metadataStorageCoordinator ) From 6665d21f284693fa18462099d2339a44d56c3d81 Mon Sep 17 00:00:00 2001 From: Amatya Date: Fri, 7 Oct 2022 16:43:31 +0530 Subject: [PATCH 3/8] test changes --- .../druid/indexing/overlord/SyncResult.java | 9 +++-- .../druid/indexing/overlord/TaskLockbox.java | 38 +++++++++---------- .../druid/indexing/overlord/TaskMaster.java | 29 ++++++++++---- 3 files changed, 43 insertions(+), 33 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SyncResult.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SyncResult.java index 2df3dbc45142..28eaeb71435c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SyncResult.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SyncResult.java @@ -21,16 +21,17 @@ import org.apache.druid.indexing.common.task.Task; -import java.util.List; +import java.util.Set; /** * Result of TaskLockbox#syncFromStorage() + * Contains tasks which need to be forcefully failed to let the overlord become the leader */ class SyncResult { - private final List tasksToFail; + private final Set tasksToFail; - SyncResult(List tasksToFail) + SyncResult(Set tasksToFail) { this.tasksToFail = tasksToFail; } @@ -38,7 +39,7 @@ class SyncResult /** * Return list of tasks whose status needs to be set to failed due to lock re-acquisition failure */ - List getTasksToFail() + Set getTasksToFail() { return tasksToFail; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index 2e6b18cd2660..01aa74ec8c42 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -117,10 +117,8 @@ public SyncResult syncFromStorage() try { // Load stuff from taskStorage first. If this fails, we don't want to lose all our locks. - final Set storedActiveTasks = new HashSet<>(); final List> storedLocks = new ArrayList<>(); for (final Task task : taskStorage.getActiveTasks()) { - storedActiveTasks.add(task.getId()); for (final TaskLock taskLock : taskStorage.getLocks(task.getId())) { storedLocks.add(Pair.of(task, taskLock)); } @@ -140,8 +138,7 @@ public int compare(Pair left, Pair right) }; running.clear(); activeTasks.clear(); - activeTasks.addAll(storedActiveTasks); - Set failedToReacquireLockTasks = new HashSet<>(); + final Set failedToReacquireLockTaskGroups = new HashSet<>(); // Bookkeeping for a log message at the end int taskLockCount = 0; for (final Pair taskAndLock : byVersionOrdering.sortedCopy(storedLocks)) { @@ -153,11 +150,6 @@ public int compare(Pair left, Pair right) continue; } - if (failedToReacquireLockTasks.contains(task)) { - log.info("Ignoring task[%s] as it failed to acquire at least one of its locks", task.getId()); - continue; - } - // Create a new taskLock if it doesn't have a proper priority, // so that every taskLock in memory has the priority. final TaskLock savedTaskLockWithPriority = savedTaskLock.getPriority() == null @@ -191,7 +183,7 @@ public int compare(Pair left, Pair right) ); } } else { - failedToReacquireLockTasks.add(task); + failedToReacquireLockTaskGroups.add(task.getGroupId()); log.error( "Could not reacquire lock on interval[%s] version[%s] for task: %s.", savedTaskLockWithPriority.getInterval(), @@ -200,6 +192,16 @@ public int compare(Pair left, Pair right) ); } } + + Set tasksToFail = new HashSet<>(); + for (Task task : taskStorage.getActiveTasks()) { + if (failedToReacquireLockTaskGroups.contains(task.getGroupId())) { + tasksToFail.add(task); + } else { + activeTasks.add(task.getId()); + } + } + log.info( "Synced %,d locks for %,d activeTasks from storage (%,d locks ignored).", taskLockCount, @@ -207,18 +209,8 @@ public int compare(Pair left, Pair right) storedLocks.size() - taskLockCount ); - // unlock all TaskLockPosse created, and also remove lock entries from storage - for (Task task : failedToReacquireLockTasks) { - unlockAll(task); - for (TaskLock lock : taskStorage.getLocks(task.getId())) { - taskStorage.removeLock(task.getId(), lock); - } - } - return new SyncResult( - ImmutableList.copyOf(failedToReacquireLockTasks) - ); + return new SyncResult(tasksToFail); } - finally { giant.unlock(); } @@ -233,6 +225,10 @@ protected TaskLockPosse verifyAndCreateOrFindLockPosse(Task task, TaskLock taskL { giant.lock(); + if (task.getGroupId().contains("zzz")) { + return null; + } + try { Preconditions.checkArgument( task.getGroupId().equals(taskLock.getGroupId()), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java index 7690c9202bd8..98a117745a90 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java @@ -115,8 +115,8 @@ public void becomeLeader() try { SyncResult syncResult = taskLockbox.syncFromStorage(); + processSyncResult(syncResult, taskStorage, taskLockbox); taskRunner = runnerFactory.build(); - processSyncResult(syncResult, taskStorage, taskRunner); taskQueue = new TaskQueue( taskLockConfig, taskQueueConfig, @@ -420,20 +420,33 @@ public Map getBlacklistedTaskSlotCount() } /** + * * Process the results of synchronization from storage * * @param syncResult to be processed - * @param taskStorage stores tasks - * @param taskRunner runs tasks and holds state + * @param taskStorage stores tasks and related entities + * @param taskLockbox manages task locking */ - private void processSyncResult(SyncResult syncResult, TaskStorage taskStorage, TaskRunner taskRunner) + private void processSyncResult(SyncResult syncResult, TaskStorage taskStorage, TaskLockbox taskLockbox) { String taskReacquisitionFailure = "Failed to reacquire lock"; for (Task task : syncResult.getTasksToFail()) { - // Shutdown task if it is running - taskRunner.shutdown(task.getId(), taskReacquisitionFailure); - // Mark as failed - taskStorage.setStatus(TaskStatus.failure(task.getId(), taskReacquisitionFailure)); + // Mark as failed all tasks + try { + taskStorage.setStatus(TaskStatus.failure(task.getId(), taskReacquisitionFailure)); + } + catch (Throwable e) { + log.warn(e, "Failed to mark task [%s] as failed", task.getId()); + } + } + for (Task task : syncResult.getTasksToFail()) { + // Mark as failed all tasks + try { + taskLockbox.unlockAll(task); + } + catch (Throwable e) { + log.warn(e, "Failed to unlock all locks for task [%s]", task.getId()); + } } } } From e9b4c51550abb2239f5218d021740b0abe6775e8 Mon Sep 17 00:00:00 2001 From: Amatya Date: Wed, 12 Oct 2022 16:02:57 +0530 Subject: [PATCH 4/8] Process SyncResult in TaskQueue --- .../druid/indexing/overlord/TaskLockbox.java | 33 ++++-- .../druid/indexing/overlord/TaskMaster.java | 36 +------ .../druid/indexing/overlord/TaskQueue.java | 20 +++- .../indexing/overlord/TaskLifecycleTest.java | 2 +- .../indexing/overlord/TaskLockConfigTest.java | 3 +- .../indexing/overlord/TaskLockboxTest.java | 47 ++++---- .../indexing/overlord/TaskQueueScaleTest.java | 3 +- .../indexing/overlord/TaskQueueTest.java | 24 +++-- .../indexing/overlord/http/OverlordTest.java | 100 ++++++++++++------ 9 files changed, 156 insertions(+), 112 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index 01aa74ec8c42..4cd5436f0c1e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -117,8 +117,11 @@ public SyncResult syncFromStorage() try { // Load stuff from taskStorage first. If this fails, we don't want to lose all our locks. + running.clear(); + activeTasks.clear(); final List> storedLocks = new ArrayList<>(); for (final Task task : taskStorage.getActiveTasks()) { + activeTasks.add(task.getId()); for (final TaskLock taskLock : taskStorage.getLocks(task.getId())) { storedLocks.add(Pair.of(task, taskLock)); } @@ -136,8 +139,6 @@ public int compare(Pair left, Pair right) .result(); } }; - running.clear(); - activeTasks.clear(); final Set failedToReacquireLockTaskGroups = new HashSet<>(); // Bookkeeping for a log message at the end int taskLockCount = 0; @@ -190,6 +191,7 @@ public int compare(Pair left, Pair right) savedTaskLockWithPriority.getVersion(), task.getId() ); + continue; } } @@ -197,8 +199,7 @@ public int compare(Pair left, Pair right) for (Task task : taskStorage.getActiveTasks()) { if (failedToReacquireLockTaskGroups.contains(task.getGroupId())) { tasksToFail.add(task); - } else { - activeTasks.add(task.getId()); + activeTasks.remove(task.getId()); } } @@ -225,10 +226,6 @@ protected TaskLockPosse verifyAndCreateOrFindLockPosse(Task task, TaskLock taskL { giant.lock(); - if (task.getGroupId().contains("zzz")) { - return null; - } - try { Preconditions.checkArgument( task.getGroupId().equals(taskLock.getGroupId()), @@ -876,6 +873,26 @@ public void unlockAll(Task task) } } + public void unlockUnacquiredLocks(Task task) + { + giant.lock(); + try { + for (TaskLock lock : taskStorage.getLocks(task.getId())) { + // Clean up entries for locks for which lock couldn't be acquired + try { + taskStorage.removeLock(task.getId(), lock); + lockReleaseCondition.signalAll(); + } + catch (Throwable e) { + log.warn(e, "Failed to unlock lock for task [%s]", task.getId()); + } + } + } + finally { + giant.unlock(); + } + } + public void add(Task task) { giant.lock(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java index 98a117745a90..3fff3485c562 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java @@ -26,7 +26,6 @@ import org.apache.druid.discovery.DruidLeaderSelector; import org.apache.druid.discovery.DruidLeaderSelector.Listener; import org.apache.druid.guice.annotations.Self; -import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.actions.TaskActionClientFactory; import org.apache.druid.indexing.common.task.Task; @@ -115,7 +114,6 @@ public void becomeLeader() try { SyncResult syncResult = taskLockbox.syncFromStorage(); - processSyncResult(syncResult, taskStorage, taskLockbox); taskRunner = runnerFactory.build(); taskQueue = new TaskQueue( taskLockConfig, @@ -125,7 +123,8 @@ public void becomeLeader() taskRunner, taskActionClientFactory, taskLockbox, - emitter + emitter, + syncResult ); // Sensible order to start stuff: @@ -418,35 +417,4 @@ public Map getBlacklistedTaskSlotCount() return null; } } - - /** - * - * Process the results of synchronization from storage - * - * @param syncResult to be processed - * @param taskStorage stores tasks and related entities - * @param taskLockbox manages task locking - */ - private void processSyncResult(SyncResult syncResult, TaskStorage taskStorage, TaskLockbox taskLockbox) - { - String taskReacquisitionFailure = "Failed to reacquire lock"; - for (Task task : syncResult.getTasksToFail()) { - // Mark as failed all tasks - try { - taskStorage.setStatus(TaskStatus.failure(task.getId(), taskReacquisitionFailure)); - } - catch (Throwable e) { - log.warn(e, "Failed to mark task [%s] as failed", task.getId()); - } - } - for (Task task : syncResult.getTasksToFail()) { - // Mark as failed all tasks - try { - taskLockbox.unlockAll(task); - } - catch (Throwable e) { - log.warn(e, "Failed to unlock all locks for task [%s]", task.getId()); - } - } - } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index 07f136c72373..9c4f5553b300 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -34,6 +34,7 @@ import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.Counters; +import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.actions.TaskActionClientFactory; import org.apache.druid.indexing.common.task.IndexTaskUtils; import org.apache.druid.indexing.common.task.Task; @@ -100,6 +101,10 @@ public class TaskQueue @GuardedBy("giant") private final Set recentlyCompletedTasks = new HashSet<>(); + // Tasks to be forcefully failed due to lock reacquisition failure + @GuardedBy("giant") + private final Set tasksToFail; + private final TaskLockConfig lockConfig; private final TaskQueueConfig config; private final DefaultTaskConfig defaultTaskConfig; @@ -142,7 +147,8 @@ public TaskQueue( TaskRunner taskRunner, TaskActionClientFactory taskActionClientFactory, TaskLockbox taskLockbox, - ServiceEmitter emitter + ServiceEmitter emitter, + SyncResult syncResult ) { this.lockConfig = Preconditions.checkNotNull(lockConfig, "lockConfig"); @@ -153,6 +159,7 @@ public TaskQueue( this.taskActionClientFactory = Preconditions.checkNotNull(taskActionClientFactory, "taskActionClientFactory"); this.taskLockbox = Preconditions.checkNotNull(taskLockbox, "taskLockbox"); this.emitter = Preconditions.checkNotNull(emitter, "emitter"); + this.tasksToFail = syncResult == null ? new HashSet<>() : syncResult.getTasksToFail(); } @VisibleForTesting @@ -770,6 +777,17 @@ private void syncFromStorage() addTaskInternal(task); } + for (Task task : tasksToFail) { + try { + tasks.putIfAbsent(task.getId(), task); + shutdown(task.getId(), "Failed to reacquire lock"); + } + catch (Throwable e) { + log.warn(e, "Failed to shutdown task[%s]", task.getId()); + } + } + tasksToFail.clear(); + log.info( "Synced %d tasks from storage (%d tasks added, %d tasks removed).", tasksSynced, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 8d4e2966486d..414b2ba70377 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -730,7 +730,7 @@ private TaskQueue setUpTaskQueue(TaskStorage ts, TaskRunner tr) throws Exception TaskQueueConfig.class ); - return new TaskQueue(lockConfig, tqc, new DefaultTaskConfig(), ts, tr, tac, taskLockbox, emitter); + return new TaskQueue(lockConfig, tqc, new DefaultTaskConfig(), ts, tr, tac, taskLockbox, emitter, null); } @After diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockConfigTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockConfigTest.java index b158eaca7797..139e2010ed16 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockConfigTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockConfigTest.java @@ -121,7 +121,8 @@ public boolean isForceTimeChunkLock() taskRunner, actionClientFactory, lockbox, - emitter + emitter, + null ); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java index b14d2080a1f8..a3b558bb0b3c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.LockGranularity; @@ -1261,36 +1262,34 @@ public void testGetLockedIntervalsForRevokedLocks() throws Exception @Test public void testFailedToReacquireTaskLock() throws Exception { - final Task task = NoopTask.create(); - taskStorage.insert(task, TaskStatus.running(task.getId())); + final Task badTask0 = NoopTask.withGroupId("BadTask"); + final Task badTask1 = NoopTask.withGroupId("BadTask"); + final Task goodTask0 = NoopTask.withGroupId("GoodTask"); + taskStorage.insert(badTask0, TaskStatus.running(badTask0.getId())); + taskStorage.insert(badTask1, TaskStatus.running(badTask1.getId())); + taskStorage.insert(goodTask0, TaskStatus.running(goodTask0.getId())); TaskLockbox testLockbox = new NullLockPosseTaskLockbox(taskStorage, metadataStorageCoordinator); - testLockbox.add(task); - testLockbox.tryLock(task, new TimeChunkLockRequest(TaskLockType.EXCLUSIVE, - task, - Intervals.of("2017-05-01/2017-06-01"), - null) - ); - testLockbox.tryLock(task, new TimeChunkLockRequest(TaskLockType.EXCLUSIVE, - task, - Intervals.of("2017-06-01/2017-07-01"), - null) + testLockbox.add(badTask0); + testLockbox.add(badTask1); + testLockbox.add(goodTask0); + + testLockbox.tryLock(badTask0, new TimeChunkLockRequest(TaskLockType.EXCLUSIVE, + badTask0, + Intervals.of("2017-07-01/2017-08-01"), + null) ); - testLockbox.tryLock(task, new TimeChunkLockRequest(TaskLockType.EXCLUSIVE, - task, - Intervals.of("2017-07-01/2017-08-01"), - null) + + testLockbox.tryLock(goodTask0, new TimeChunkLockRequest(TaskLockType.EXCLUSIVE, + goodTask0, + Intervals.of("2017-07-01/2017-08-01"), + null) ); - Assert.assertEquals(1, taskStorage.getActiveTasks().size()); - Assert.assertEquals(3, taskStorage.getLocks(task.getId()).size()); + Assert.assertEquals(3, taskStorage.getActiveTasks().size()); // The task must be marked for failure SyncResult result = testLockbox.syncFromStorage(); - Assert.assertEquals(1, result.getTasksToFail().size()); - Assert.assertEquals(task, result.getTasksToFail().get(0)); - - // Task must no longer have active locks - Assert.assertEquals(0, taskStorage.getLocks(task.getId()).size()); + Assert.assertEquals(ImmutableSet.of(badTask0, badTask1), result.getTasksToFail()); } private Set getAllLocks(List tasks) @@ -1432,7 +1431,7 @@ public NullLockPosseTaskLockbox( @Override protected TaskLockPosse verifyAndCreateOrFindLockPosse(Task task, TaskLock taskLock) { - return null; + return task.getGroupId().contains("BadTask") ? null : super.verifyAndCreateOrFindLockPosse(task, taskLock); } } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java index d305b0d6c9b2..e6098c843e2f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java @@ -122,7 +122,8 @@ public RetType submit(TaskAction taskAction) taskRunner, unsupportedTaskActionFactory, // Not used for anything serious new TaskLockbox(taskStorage, storageCoordinator), - new NoopServiceEmitter() + new NoopServiceEmitter(), + null ); taskQueue.start(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java index a90ad8b3c090..7cf0f540d588 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java @@ -86,7 +86,8 @@ public void testManageInternalReleaseLockWhenTaskIsNotReady() throws Exception new SimpleTaskRunner(actionClientFactory), actionClientFactory, getLockbox(), - new NoopServiceEmitter() + new NoopServiceEmitter(), + null ); taskQueue.setActive(true); // task1 emulates a case when there is a task that was issued before task2 and acquired locks conflicting @@ -131,7 +132,8 @@ public void testShutdownReleasesTaskLock() throws Exception new SimpleTaskRunner(actionClientFactory), actionClientFactory, getLockbox(), - new NoopServiceEmitter() + new NoopServiceEmitter(), + null ); taskQueue.setActive(true); @@ -171,7 +173,8 @@ public void testSetUseLineageBasedSegmentAllocationByDefault() throws EntryExist new SimpleTaskRunner(actionClientFactory), actionClientFactory, getLockbox(), - new NoopServiceEmitter() + new NoopServiceEmitter(), + null ); taskQueue.setActive(true); final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D")); @@ -206,7 +209,8 @@ public Map getContext() new SimpleTaskRunner(actionClientFactory), actionClientFactory, getLockbox(), - new NoopServiceEmitter() + new NoopServiceEmitter(), + null ); taskQueue.setActive(true); final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D")); @@ -231,7 +235,8 @@ public void testUserProvidedTaskContextOverrideDefaultLineageBasedSegmentAllocat new SimpleTaskRunner(actionClientFactory), actionClientFactory, getLockbox(), - new NoopServiceEmitter() + new NoopServiceEmitter(), + null ); taskQueue.setActive(true); final Task task = new TestTask( @@ -273,7 +278,8 @@ public Map getContext() new SimpleTaskRunner(actionClientFactory), actionClientFactory, getLockbox(), - new NoopServiceEmitter() + new NoopServiceEmitter(), + null ); taskQueue.setActive(true); final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D")); @@ -296,7 +302,8 @@ public void testUserProvidedContextOverrideLockConfig() throws EntryExistsExcept new SimpleTaskRunner(actionClientFactory), actionClientFactory, getLockbox(), - new NoopServiceEmitter() + new NoopServiceEmitter(), + null ); taskQueue.setActive(true); final Task task = new TestTask( @@ -326,7 +333,8 @@ public void testTaskStatusWhenExceptionIsThrownInIsReady() throws EntryExistsExc new SimpleTaskRunner(actionClientFactory), actionClientFactory, getLockbox(), - new NoopServiceEmitter() + new NoopServiceEmitter(), + null ); taskQueue.setActive(true); final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D")) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java index d0652d3aa60f..371956e0d631 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java @@ -38,12 +38,15 @@ import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatusPlus; +import org.apache.druid.indexing.common.TaskLock; +import org.apache.druid.indexing.common.TimeChunkLock; import org.apache.druid.indexing.common.actions.TaskActionClientFactory; import org.apache.druid.indexing.common.config.TaskStorageConfig; import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage; import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter; +import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.indexing.overlord.TaskRunner; @@ -59,6 +62,8 @@ import org.apache.druid.indexing.overlord.config.TaskQueueConfig; import org.apache.druid.indexing.overlord.helpers.OverlordHelperManager; import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; +import org.apache.druid.indexing.test.TestIndexerMetadataStorageCoordinator; +import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.emitter.EmittingLogger; @@ -82,6 +87,7 @@ import javax.ws.rs.core.Response; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; @@ -103,11 +109,15 @@ public class OverlordTest private CountDownLatch announcementLatch; private DruidNode druidNode; private OverlordResource overlordResource; - private CountDownLatch[] taskCompletionCountDownLatches; - private CountDownLatch[] runTaskCountDownLatches; + private Map taskCompletionCountDownLatches; + private Map runTaskCountDownLatches; private HttpServletRequest req; private SupervisorManager supervisorManager; + // Bad task's id must be lexicographically greater than the good task's + private final String goodTaskId = "aaa"; + private final String badTaskId = "zzz"; + private void setupServerAndCurator() throws Exception { server = new TestingServer(); @@ -140,38 +150,52 @@ public void setUp() throws Exception req.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true); EasyMock.expectLastCall().anyTimes(); supervisorManager = EasyMock.createMock(SupervisorManager.class); - taskLockbox = EasyMock.createStrictMock(TaskLockbox.class); - taskLockbox.syncFromStorage(); - EasyMock.expectLastCall().atLeastOnce(); - taskLockbox.add(EasyMock.anyObject()); - EasyMock.expectLastCall().atLeastOnce(); - taskLockbox.remove(EasyMock.anyObject()); - EasyMock.expectLastCall().atLeastOnce(); - - // for second Noop Task directly added to deep storage. - taskLockbox.add(EasyMock.anyObject()); - EasyMock.expectLastCall().atLeastOnce(); - taskLockbox.remove(EasyMock.anyObject()); - EasyMock.expectLastCall().atLeastOnce(); taskActionClientFactory = EasyMock.createStrictMock(TaskActionClientFactory.class); EasyMock.expect(taskActionClientFactory.create(EasyMock.anyObject())) .andReturn(null).anyTimes(); - EasyMock.replay(taskLockbox, taskActionClientFactory, req); + EasyMock.replay(taskActionClientFactory, req); taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null)); - runTaskCountDownLatches = new CountDownLatch[2]; - runTaskCountDownLatches[0] = new CountDownLatch(1); - runTaskCountDownLatches[1] = new CountDownLatch(1); - taskCompletionCountDownLatches = new CountDownLatch[2]; - taskCompletionCountDownLatches[0] = new CountDownLatch(1); - taskCompletionCountDownLatches[1] = new CountDownLatch(1); + + IndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); + + taskLockbox = new TaskLockbox(taskStorage, mdc); + + runTaskCountDownLatches = new HashMap<>(); + runTaskCountDownLatches.put("0", new CountDownLatch(1)); + runTaskCountDownLatches.put("1", new CountDownLatch(1)); + taskCompletionCountDownLatches = new HashMap<>(); + taskCompletionCountDownLatches.put("0", new CountDownLatch(1)); + taskCompletionCountDownLatches.put("1", new CountDownLatch(1)); announcementLatch = new CountDownLatch(1); setupServerAndCurator(); curator.start(); curator.blockUntilConnected(); druidNode = new DruidNode("hey", "what", false, 1234, null, true, false); ServiceEmitter serviceEmitter = new NoopServiceEmitter(); + + // Add two task with conflicting locks + // The "BadTask" must be failed + Task badTask = new NoopTask(badTaskId, badTaskId, "datasource", 10_000, 0, null, null, null); + TaskLock badLock = new TimeChunkLock(null, badTaskId, "datasource", Intervals.ETERNITY, "version1", 50); + Task goodTask = new NoopTask(goodTaskId, goodTaskId, "datasource", 0, 0, null, null, null); + TaskLock goodLock = new TimeChunkLock(null, goodTaskId, "datasource", Intervals.ETERNITY, "version0", 50); + taskStorage.insert(goodTask, TaskStatus.running(goodTaskId)); + taskStorage.insert(badTask, TaskStatus.running(badTaskId)); + taskStorage.addLock(badTaskId, badLock); + taskStorage.addLock(goodTaskId, goodLock); + runTaskCountDownLatches.put(badTaskId, new CountDownLatch(1)); + runTaskCountDownLatches.put(goodTaskId, new CountDownLatch(1)); + taskCompletionCountDownLatches.put(badTaskId, new CountDownLatch(1)); + taskCompletionCountDownLatches.put(goodTaskId, new CountDownLatch(1)); + + TaskRunnerFactory taskRunnerFactory = (TaskRunnerFactory) () -> + new MockTaskRunner(runTaskCountDownLatches, taskCompletionCountDownLatches); + + taskRunnerFactory.build().run(badTask); + taskRunnerFactory.build().run(goodTask); + taskMaster = new TaskMaster( new TaskLockConfig(), new TaskQueueConfig(null, new Period(1), null, new Period(10)), @@ -180,8 +204,7 @@ public void setUp() throws Exception taskStorage, taskActionClientFactory, druidNode, - (TaskRunnerFactory) () -> - new MockTaskRunner(runTaskCountDownLatches, taskCompletionCountDownLatches), + taskRunnerFactory, new LatchableServiceAnnouncer(announcementLatch, null), new CoordinatorOverlordServiceConfig(null, null), serviceEmitter, @@ -222,6 +245,13 @@ public void testOverlordRun() throws Exception Response response = overlordResource.getLeader(); Assert.assertEquals(druidNode.getHostAndPort(), response.getEntity()); + // BadTask must fail due to null task lock + waitForTaskStatus(badTaskId, TaskState.FAILED); + + // GoodTask must successfully run + taskCompletionCountDownLatches.get(goodTaskId).countDown(); + waitForTaskStatus(goodTaskId, TaskState.SUCCESS); + final String taskId_0 = "0"; NoopTask task_0 = NoopTask.create(taskId_0, 0); response = overlordResource.taskPost(task_0, req); @@ -249,7 +279,7 @@ public void testOverlordRun() throws Exception ); // Simulate completion of task_0 - taskCompletionCountDownLatches[Integer.parseInt(taskId_0)].countDown(); + taskCompletionCountDownLatches.get(taskId_0).countDown(); // Wait for taskQueue to handle success status of task_0 waitForTaskStatus(taskId_0, TaskState.SUCCESS); @@ -259,7 +289,7 @@ public void testOverlordRun() throws Exception NoopTask task_1 = NoopTask.create(taskId_1, 0); taskStorage.insert(task_1, TaskStatus.running(taskId_1)); // Wait for task runner to run task_1 - runTaskCountDownLatches[Integer.parseInt(taskId_1)].await(); + runTaskCountDownLatches.get(taskId_1).await(); response = overlordResource.getRunningTasks(null, req); // 1 task that was manually inserted should be in running state @@ -270,19 +300,20 @@ public void testOverlordRun() throws Exception Assert.assertEquals(TASK_LOCATION, taskResponseObject.getLocation()); // Simulate completion of task_1 - taskCompletionCountDownLatches[Integer.parseInt(taskId_1)].countDown(); + taskCompletionCountDownLatches.get(taskId_1).countDown(); // Wait for taskQueue to handle success status of task_1 waitForTaskStatus(taskId_1, TaskState.SUCCESS); // should return number of tasks which are not in running state response = overlordResource.getCompleteTasks(null, req); - Assert.assertEquals(2, (((List) response.getEntity()).size())); + Assert.assertEquals(4, (((List) response.getEntity()).size())); response = overlordResource.getCompleteTasks(1, req); Assert.assertEquals(1, (((List) response.getEntity()).size())); + taskMaster.stop(); Assert.assertFalse(taskMaster.isLeader()); - EasyMock.verify(taskLockbox, taskActionClientFactory); + EasyMock.verify(taskActionClientFactory); } /* Wait until the task with given taskId has the given Task Status @@ -308,12 +339,12 @@ public void tearDown() public static class MockTaskRunner implements TaskRunner { - private CountDownLatch[] completionLatches; - private CountDownLatch[] runLatches; + private Map completionLatches; + private Map runLatches; private ConcurrentHashMap taskRunnerWorkItems; private List runningTasks; - public MockTaskRunner(CountDownLatch[] runLatches, CountDownLatch[] completionLatches) + public MockTaskRunner(Map runLatches, Map completionLatches) { this.runLatches = runLatches; this.completionLatches = completionLatches; @@ -367,11 +398,11 @@ public TaskStatus call() throws Exception // this is equivalent of getting process holder to run task in ForkingTaskRunner runningTasks.add(taskId); if (runLatches != null) { - runLatches[Integer.parseInt(taskId)].countDown(); + runLatches.get(taskId).countDown(); } // Wait for completion count down if (completionLatches != null) { - completionLatches[Integer.parseInt(taskId)].await(); + completionLatches.get(taskId).await(); } taskRunnerWorkItems.remove(taskId); runningTasks.remove(taskId); @@ -407,6 +438,7 @@ public String getDataSource() @Override public void shutdown(String taskid, String reason) { + runningTasks.remove(taskid); } @Override From d17114a7ff97cb22e33ea16cd4d8ae9aecb137c2 Mon Sep 17 00:00:00 2001 From: Amatya Date: Wed, 12 Oct 2022 20:36:17 +0530 Subject: [PATCH 5/8] Unused import --- .../main/java/org/apache/druid/indexing/overlord/TaskQueue.java | 1 - 1 file changed, 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index 9c4f5553b300..7bd03143c424 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -34,7 +34,6 @@ import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.Counters; -import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.actions.TaskActionClientFactory; import org.apache.druid.indexing.common.task.IndexTaskUtils; import org.apache.druid.indexing.common.task.Task; From 212a786d324f5ee65f389987bac208c74a3b1808 Mon Sep 17 00:00:00 2001 From: Amatya Date: Fri, 14 Oct 2022 11:35:41 +0530 Subject: [PATCH 6/8] Refactoring and clean-up --- .../druid/indexing/overlord/TaskLockbox.java | 34 ++++++------------- ...Result.java => TaskLockboxSyncResult.java} | 4 +-- .../druid/indexing/overlord/TaskMaster.java | 4 +-- .../druid/indexing/overlord/TaskQueue.java | 33 ++++++++---------- .../indexing/overlord/TaskLifecycleTest.java | 2 +- .../indexing/overlord/TaskLockConfigTest.java | 3 +- .../indexing/overlord/TaskLockboxTest.java | 2 +- .../indexing/overlord/TaskQueueScaleTest.java | 3 +- .../indexing/overlord/TaskQueueTest.java | 24 +++++-------- 9 files changed, 40 insertions(+), 69 deletions(-) rename indexing-service/src/main/java/org/apache/druid/indexing/overlord/{SyncResult.java => TaskLockboxSyncResult.java} (94%) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index 4cd5436f0c1e..33f8818c5c0c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -111,7 +111,7 @@ public TaskLockbox( * * @return SyncResult which needs to be processed by the caller */ - public SyncResult syncFromStorage() + public TaskLockboxSyncResult syncFromStorage() { giant.lock(); @@ -186,10 +186,11 @@ public int compare(Pair left, Pair right) } else { failedToReacquireLockTaskGroups.add(task.getGroupId()); log.error( - "Could not reacquire lock on interval[%s] version[%s] for task: %s.", + "Could not reacquire lock on interval[%s] version[%s] for task: %s from group %s.", savedTaskLockWithPriority.getInterval(), savedTaskLockWithPriority.getVersion(), - task.getId() + task.getId(), + task.getGroupId() ); continue; } @@ -210,7 +211,12 @@ public int compare(Pair left, Pair right) storedLocks.size() - taskLockCount ); - return new SyncResult(tasksToFail); + if (!failedToReacquireLockTaskGroups.isEmpty()) { + log.warn("Marking all tasks from task groups[%s] to be failed " + + "as they failed to reacquire at least one lock.", failedToReacquireLockTaskGroups); + } + + return new TaskLockboxSyncResult(tasksToFail); } finally { giant.unlock(); @@ -873,26 +879,6 @@ public void unlockAll(Task task) } } - public void unlockUnacquiredLocks(Task task) - { - giant.lock(); - try { - for (TaskLock lock : taskStorage.getLocks(task.getId())) { - // Clean up entries for locks for which lock couldn't be acquired - try { - taskStorage.removeLock(task.getId(), lock); - lockReleaseCondition.signalAll(); - } - catch (Throwable e) { - log.warn(e, "Failed to unlock lock for task [%s]", task.getId()); - } - } - } - finally { - giant.unlock(); - } - } - public void add(Task task) { giant.lock(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SyncResult.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockboxSyncResult.java similarity index 94% rename from indexing-service/src/main/java/org/apache/druid/indexing/overlord/SyncResult.java rename to indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockboxSyncResult.java index 28eaeb71435c..a22a7eb98513 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SyncResult.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockboxSyncResult.java @@ -27,11 +27,11 @@ * Result of TaskLockbox#syncFromStorage() * Contains tasks which need to be forcefully failed to let the overlord become the leader */ -class SyncResult +class TaskLockboxSyncResult { private final Set tasksToFail; - SyncResult(Set tasksToFail) + TaskLockboxSyncResult(Set tasksToFail) { this.tasksToFail = tasksToFail; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java index 3fff3485c562..7b9101cf1f22 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java @@ -113,7 +113,6 @@ public void becomeLeader() log.info("By the power of Grayskull, I have the power!"); try { - SyncResult syncResult = taskLockbox.syncFromStorage(); taskRunner = runnerFactory.build(); taskQueue = new TaskQueue( taskLockConfig, @@ -123,8 +122,7 @@ public void becomeLeader() taskRunner, taskActionClientFactory, taskLockbox, - emitter, - syncResult + emitter ); // Sensible order to start stuff: diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index 7bd03143c424..d54e82084525 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -34,6 +34,7 @@ import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.Counters; +import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.actions.TaskActionClientFactory; import org.apache.druid.indexing.common.task.IndexTaskUtils; import org.apache.druid.indexing.common.task.Task; @@ -100,10 +101,6 @@ public class TaskQueue @GuardedBy("giant") private final Set recentlyCompletedTasks = new HashSet<>(); - // Tasks to be forcefully failed due to lock reacquisition failure - @GuardedBy("giant") - private final Set tasksToFail; - private final TaskLockConfig lockConfig; private final TaskQueueConfig config; private final DefaultTaskConfig defaultTaskConfig; @@ -146,8 +143,7 @@ public TaskQueue( TaskRunner taskRunner, TaskActionClientFactory taskActionClientFactory, TaskLockbox taskLockbox, - ServiceEmitter emitter, - SyncResult syncResult + ServiceEmitter emitter ) { this.lockConfig = Preconditions.checkNotNull(lockConfig, "lockConfig"); @@ -158,7 +154,6 @@ public TaskQueue( this.taskActionClientFactory = Preconditions.checkNotNull(taskActionClientFactory, "taskActionClientFactory"); this.taskLockbox = Preconditions.checkNotNull(taskLockbox, "taskLockbox"); this.emitter = Preconditions.checkNotNull(emitter, "emitter"); - this.tasksToFail = syncResult == null ? new HashSet<>() : syncResult.getTasksToFail(); } @VisibleForTesting @@ -179,6 +174,12 @@ public void start() Preconditions.checkState(!active, "queue must be stopped"); active = true; syncFromStorage(); + // Mark these tasks as failed as they could not reacuire the lock + // Clean up needs to happen after tasks have been synced from storage + Set tasksToFail = taskLockbox.syncFromStorage().getTasksToFail(); + for (Task task : tasksToFail) { + shutdown(task.getId(), "Failed to reacquire lock."); + } managerExec.submit( new Runnable() { @@ -234,6 +235,13 @@ public ScheduledExecutors.Signal call() } ); requestManagement(); + // Remove any unacquired locks from storage + // This is called after requesting management as task failure occurs after notifyStatus is processed + for (Task task : tasksToFail) { + for (TaskLock lock : taskStorage.getLocks(task.getId())) { + taskStorage.removeLock(task.getId(), lock); + } + } } finally { giant.unlock(); @@ -776,17 +784,6 @@ private void syncFromStorage() addTaskInternal(task); } - for (Task task : tasksToFail) { - try { - tasks.putIfAbsent(task.getId(), task); - shutdown(task.getId(), "Failed to reacquire lock"); - } - catch (Throwable e) { - log.warn(e, "Failed to shutdown task[%s]", task.getId()); - } - } - tasksToFail.clear(); - log.info( "Synced %d tasks from storage (%d tasks added, %d tasks removed).", tasksSynced, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 414b2ba70377..8d4e2966486d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -730,7 +730,7 @@ private TaskQueue setUpTaskQueue(TaskStorage ts, TaskRunner tr) throws Exception TaskQueueConfig.class ); - return new TaskQueue(lockConfig, tqc, new DefaultTaskConfig(), ts, tr, tac, taskLockbox, emitter, null); + return new TaskQueue(lockConfig, tqc, new DefaultTaskConfig(), ts, tr, tac, taskLockbox, emitter); } @After diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockConfigTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockConfigTest.java index 139e2010ed16..b158eaca7797 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockConfigTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockConfigTest.java @@ -121,8 +121,7 @@ public boolean isForceTimeChunkLock() taskRunner, actionClientFactory, lockbox, - emitter, - null + emitter ); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java index a3b558bb0b3c..9df77f0c98ca 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java @@ -1288,7 +1288,7 @@ public void testFailedToReacquireTaskLock() throws Exception Assert.assertEquals(3, taskStorage.getActiveTasks().size()); // The task must be marked for failure - SyncResult result = testLockbox.syncFromStorage(); + TaskLockboxSyncResult result = testLockbox.syncFromStorage(); Assert.assertEquals(ImmutableSet.of(badTask0, badTask1), result.getTasksToFail()); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java index e6098c843e2f..d305b0d6c9b2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java @@ -122,8 +122,7 @@ public RetType submit(TaskAction taskAction) taskRunner, unsupportedTaskActionFactory, // Not used for anything serious new TaskLockbox(taskStorage, storageCoordinator), - new NoopServiceEmitter(), - null + new NoopServiceEmitter() ); taskQueue.start(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java index 7cf0f540d588..a90ad8b3c090 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java @@ -86,8 +86,7 @@ public void testManageInternalReleaseLockWhenTaskIsNotReady() throws Exception new SimpleTaskRunner(actionClientFactory), actionClientFactory, getLockbox(), - new NoopServiceEmitter(), - null + new NoopServiceEmitter() ); taskQueue.setActive(true); // task1 emulates a case when there is a task that was issued before task2 and acquired locks conflicting @@ -132,8 +131,7 @@ public void testShutdownReleasesTaskLock() throws Exception new SimpleTaskRunner(actionClientFactory), actionClientFactory, getLockbox(), - new NoopServiceEmitter(), - null + new NoopServiceEmitter() ); taskQueue.setActive(true); @@ -173,8 +171,7 @@ public void testSetUseLineageBasedSegmentAllocationByDefault() throws EntryExist new SimpleTaskRunner(actionClientFactory), actionClientFactory, getLockbox(), - new NoopServiceEmitter(), - null + new NoopServiceEmitter() ); taskQueue.setActive(true); final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D")); @@ -209,8 +206,7 @@ public Map getContext() new SimpleTaskRunner(actionClientFactory), actionClientFactory, getLockbox(), - new NoopServiceEmitter(), - null + new NoopServiceEmitter() ); taskQueue.setActive(true); final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D")); @@ -235,8 +231,7 @@ public void testUserProvidedTaskContextOverrideDefaultLineageBasedSegmentAllocat new SimpleTaskRunner(actionClientFactory), actionClientFactory, getLockbox(), - new NoopServiceEmitter(), - null + new NoopServiceEmitter() ); taskQueue.setActive(true); final Task task = new TestTask( @@ -278,8 +273,7 @@ public Map getContext() new SimpleTaskRunner(actionClientFactory), actionClientFactory, getLockbox(), - new NoopServiceEmitter(), - null + new NoopServiceEmitter() ); taskQueue.setActive(true); final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D")); @@ -302,8 +296,7 @@ public void testUserProvidedContextOverrideLockConfig() throws EntryExistsExcept new SimpleTaskRunner(actionClientFactory), actionClientFactory, getLockbox(), - new NoopServiceEmitter(), - null + new NoopServiceEmitter() ); taskQueue.setActive(true); final Task task = new TestTask( @@ -333,8 +326,7 @@ public void testTaskStatusWhenExceptionIsThrownInIsReady() throws EntryExistsExc new SimpleTaskRunner(actionClientFactory), actionClientFactory, getLockbox(), - new NoopServiceEmitter(), - null + new NoopServiceEmitter() ); taskQueue.setActive(true); final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D")) From 87bff8a584c3ed7d0d4ef20353531ae4a0712c70 Mon Sep 17 00:00:00 2001 From: Amatya Date: Fri, 14 Oct 2022 12:13:41 +0530 Subject: [PATCH 7/8] Revert changes to prevent a bad state --- .../org/apache/druid/indexing/overlord/TaskLockbox.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index 33f8818c5c0c..d5f1d7e57e29 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -117,11 +117,10 @@ public TaskLockboxSyncResult syncFromStorage() try { // Load stuff from taskStorage first. If this fails, we don't want to lose all our locks. - running.clear(); - activeTasks.clear(); + final Set storedActiveTasks = new HashSet<>(); final List> storedLocks = new ArrayList<>(); for (final Task task : taskStorage.getActiveTasks()) { - activeTasks.add(task.getId()); + storedActiveTasks.add(task.getId()); for (final TaskLock taskLock : taskStorage.getLocks(task.getId())) { storedLocks.add(Pair.of(task, taskLock)); } @@ -139,6 +138,10 @@ public int compare(Pair left, Pair right) .result(); } }; + running.clear(); + activeTasks.clear(); + activeTasks.addAll(storedActiveTasks); + // Set of task groups in which at least one task failed to re-acquire a lock final Set failedToReacquireLockTaskGroups = new HashSet<>(); // Bookkeeping for a log message at the end int taskLockCount = 0; From 7f6918bf769677bff480e55788cf75eb106384da Mon Sep 17 00:00:00 2001 From: Amatya Date: Mon, 17 Oct 2022 10:32:51 +0530 Subject: [PATCH 8/8] Refactoring and comments --- .../druid/indexing/overlord/TaskLockbox.java | 11 ++-- .../overlord/TaskLockboxSyncResult.java | 2 +- .../druid/indexing/overlord/TaskQueue.java | 7 +-- .../indexing/overlord/TaskLockboxTest.java | 54 +++++++++++-------- .../indexing/overlord/http/OverlordTest.java | 4 +- 5 files changed, 47 insertions(+), 31 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index d5f1d7e57e29..41caf0620fb3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -117,10 +117,10 @@ public TaskLockboxSyncResult syncFromStorage() try { // Load stuff from taskStorage first. If this fails, we don't want to lose all our locks. - final Set storedActiveTasks = new HashSet<>(); + final Set storedActiveTasks = new HashSet<>(); final List> storedLocks = new ArrayList<>(); for (final Task task : taskStorage.getActiveTasks()) { - storedActiveTasks.add(task.getId()); + storedActiveTasks.add(task); for (final TaskLock taskLock : taskStorage.getLocks(task.getId())) { storedLocks.add(Pair.of(task, taskLock)); } @@ -140,7 +140,10 @@ public int compare(Pair left, Pair right) }; running.clear(); activeTasks.clear(); - activeTasks.addAll(storedActiveTasks); + activeTasks.addAll(storedActiveTasks.stream() + .map(Task::getId) + .collect(Collectors.toSet()) + ); // Set of task groups in which at least one task failed to re-acquire a lock final Set failedToReacquireLockTaskGroups = new HashSet<>(); // Bookkeeping for a log message at the end @@ -200,7 +203,7 @@ public int compare(Pair left, Pair right) } Set tasksToFail = new HashSet<>(); - for (Task task : taskStorage.getActiveTasks()) { + for (Task task : storedActiveTasks) { if (failedToReacquireLockTaskGroups.contains(task.getGroupId())) { tasksToFail.add(task); activeTasks.remove(task.getId()); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockboxSyncResult.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockboxSyncResult.java index a22a7eb98513..b7273b6bdef1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockboxSyncResult.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockboxSyncResult.java @@ -37,7 +37,7 @@ class TaskLockboxSyncResult } /** - * Return list of tasks whose status needs to be set to failed due to lock re-acquisition failure + * Return set of tasks which need to be forcefully failed due to lock re-acquisition failure */ Set getTasksToFail() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index d54e82084525..c508876f0ce4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -178,7 +178,8 @@ public void start() // Clean up needs to happen after tasks have been synced from storage Set tasksToFail = taskLockbox.syncFromStorage().getTasksToFail(); for (Task task : tasksToFail) { - shutdown(task.getId(), "Failed to reacquire lock."); + shutdown(task.getId(), + "Shutting down forcefully as task failed to reacquire lock while becoming leader"); } managerExec.submit( new Runnable() @@ -235,8 +236,8 @@ public ScheduledExecutors.Signal call() } ); requestManagement(); - // Remove any unacquired locks from storage - // This is called after requesting management as task failure occurs after notifyStatus is processed + // Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired) + // This is called after requesting management as locks need to be cleared after notifyStatus is processed for (Task task : tasksToFail) { for (TaskLock lock : taskStorage.getLocks(task.getId())) { taskStorage.removeLock(task.getId(), lock); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java index 9df77f0c98ca..834bdf9429d6 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java @@ -1262,34 +1262,42 @@ public void testGetLockedIntervalsForRevokedLocks() throws Exception @Test public void testFailedToReacquireTaskLock() throws Exception { - final Task badTask0 = NoopTask.withGroupId("BadTask"); - final Task badTask1 = NoopTask.withGroupId("BadTask"); - final Task goodTask0 = NoopTask.withGroupId("GoodTask"); - taskStorage.insert(badTask0, TaskStatus.running(badTask0.getId())); - taskStorage.insert(badTask1, TaskStatus.running(badTask1.getId())); - taskStorage.insert(goodTask0, TaskStatus.running(goodTask0.getId())); + // Tasks to be failed have a group id with the substring "FailingLockAcquisition" + // Please refer to NullLockPosseTaskLockbox + final Task taskWithFailingLockAcquisition0 = NoopTask.withGroupId("FailingLockAcquisition"); + final Task taskWithFailingLockAcquisition1 = NoopTask.withGroupId("FailingLockAcquisition"); + final Task taskWithSuccessfulLockAcquisition = NoopTask.create(); + taskStorage.insert(taskWithFailingLockAcquisition0, TaskStatus.running(taskWithFailingLockAcquisition0.getId())); + taskStorage.insert(taskWithFailingLockAcquisition1, TaskStatus.running(taskWithFailingLockAcquisition1.getId())); + taskStorage.insert(taskWithSuccessfulLockAcquisition, TaskStatus.running(taskWithSuccessfulLockAcquisition.getId())); TaskLockbox testLockbox = new NullLockPosseTaskLockbox(taskStorage, metadataStorageCoordinator); - testLockbox.add(badTask0); - testLockbox.add(badTask1); - testLockbox.add(goodTask0); - - testLockbox.tryLock(badTask0, new TimeChunkLockRequest(TaskLockType.EXCLUSIVE, - badTask0, - Intervals.of("2017-07-01/2017-08-01"), - null) + testLockbox.add(taskWithFailingLockAcquisition0); + testLockbox.add(taskWithFailingLockAcquisition1); + testLockbox.add(taskWithSuccessfulLockAcquisition); + + testLockbox.tryLock(taskWithFailingLockAcquisition0, + new TimeChunkLockRequest(TaskLockType.EXCLUSIVE, + taskWithFailingLockAcquisition0, + Intervals.of("2017-07-01/2017-08-01"), + null + ) ); - testLockbox.tryLock(goodTask0, new TimeChunkLockRequest(TaskLockType.EXCLUSIVE, - goodTask0, - Intervals.of("2017-07-01/2017-08-01"), - null) + testLockbox.tryLock(taskWithSuccessfulLockAcquisition, + new TimeChunkLockRequest(TaskLockType.EXCLUSIVE, + taskWithSuccessfulLockAcquisition, + Intervals.of("2017-07-01/2017-08-01"), + null + ) ); + Assert.assertEquals(3, taskStorage.getActiveTasks().size()); - // The task must be marked for failure + // The tasks must be marked for failure TaskLockboxSyncResult result = testLockbox.syncFromStorage(); - Assert.assertEquals(ImmutableSet.of(badTask0, badTask1), result.getTasksToFail()); + Assert.assertEquals(ImmutableSet.of(taskWithFailingLockAcquisition0, taskWithFailingLockAcquisition1), + result.getTasksToFail()); } private Set getAllLocks(List tasks) @@ -1418,6 +1426,9 @@ public TaskStatus run(TaskToolbox toolbox) } } + /** + * Extends TaskLockbox to return a null TaskLockPosse when the task's group name contains "FailingLockAcquisition". + */ private static class NullLockPosseTaskLockbox extends TaskLockbox { public NullLockPosseTaskLockbox( @@ -1431,7 +1442,8 @@ public NullLockPosseTaskLockbox( @Override protected TaskLockPosse verifyAndCreateOrFindLockPosse(Task task, TaskLock taskLock) { - return task.getGroupId().contains("BadTask") ? null : super.verifyAndCreateOrFindLockPosse(task, taskLock); + return task.getGroupId() + .contains("FailingLockAcquisition") ? null : super.verifyAndCreateOrFindLockPosse(task, taskLock); } } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java index 371956e0d631..54aacc5dfe08 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java @@ -175,8 +175,8 @@ public void setUp() throws Exception druidNode = new DruidNode("hey", "what", false, 1234, null, true, false); ServiceEmitter serviceEmitter = new NoopServiceEmitter(); - // Add two task with conflicting locks - // The "BadTask" must be failed + // Add two tasks with conflicting locks + // The bad task (The one with a lexicographically larger name) must be failed Task badTask = new NoopTask(badTaskId, badTaskId, "datasource", 10_000, 0, null, null, null); TaskLock badLock = new TimeChunkLock(null, badTaskId, "datasource", Intervals.ETERNITY, "version1", 50); Task goodTask = new NoopTask(goodTaskId, goodTaskId, "datasource", 0, 0, null, null, null);