From 5c68b0bcce40b624ac3e1c7b13a1907abe3fdb9b Mon Sep 17 00:00:00 2001 From: Amatya Date: Wed, 15 Nov 2023 11:26:30 +0530 Subject: [PATCH 1/3] Fetch active task paylaods from memory --- .../druid/indexing/overlord/TaskLockbox.java | 27 ++++++++++--------- .../overlord/TaskStorageQueryAdapter.java | 9 ++++++- 2 files changed, 23 insertions(+), 13 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index 54191adf05df..47a1110ea1c8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -99,9 +99,9 @@ public class TaskLockbox private static final EmittingLogger log = new EmittingLogger(TaskLockbox.class); - // Stores List of Active Tasks. TaskLockbox will only grant locks to active activeTasks. - // this set should be accessed under the giant lock. - private final Set activeTasks = new HashSet<>(); + // Stores Active Tasks. TaskLockbox will only grant locks to active activeTasks. + // This should be accessed under the giant lock. + private final Map activeTasks = new HashMap(); @Inject public TaskLockbox( @@ -147,10 +147,7 @@ public int compare(Pair left, Pair right) }; running.clear(); activeTasks.clear(); - activeTasks.addAll(storedActiveTasks.stream() - .map(Task::getId) - .collect(Collectors.toSet()) - ); + storedActiveTasks.forEach(storedActiveTask -> activeTasks.put(storedActiveTask.getId(), storedActiveTask)); // Set of task groups in which at least one task failed to re-acquire a lock final Set failedToReacquireLockTaskGroups = new HashSet<>(); // Bookkeeping for a log message at the end @@ -379,7 +376,7 @@ public LockResult tryLock(final Task task, final LockRequest request) giant.lock(); try { - if (!activeTasks.contains(task.getId())) { + if (!activeTasks.containsKey(task.getId())) { throw new ISE("Unable to grant lock to inactive Task [%s]", task.getId()); } Preconditions.checkArgument(request.getInterval().toDurationMillis() > 0, "interval empty"); @@ -518,7 +515,7 @@ public List allocateSegments( private void verifyTaskIsActive(SegmentAllocationHolder holder) { final String taskId = holder.task.getId(); - if (!activeTasks.contains(taskId)) { + if (!activeTasks.containsKey(taskId)) { holder.markFailed("Unable to grant lock to inactive Task [%s]", taskId); } } @@ -840,7 +837,7 @@ public void revokeLock(String taskId, TaskLock lock) giant.lock(); try { - if (!activeTasks.contains(taskId)) { + if (!activeTasks.containsKey(taskId)) { throw new ISE("Cannot revoke lock for inactive task[%s]", taskId); } @@ -1202,7 +1199,7 @@ public void add(Task task) giant.lock(); try { log.info("Adding task[%s] to activeTasks", task.getId()); - activeTasks.add(task.getId()); + activeTasks.put(task.getId(), task); } finally { giant.unlock(); @@ -1363,7 +1360,13 @@ List getOnlyTaskLockPosseContainingInterval(Task task, Interval i @VisibleForTesting Set getActiveTasks() { - return activeTasks; + return activeTasks.keySet(); + } + + @Nullable + Task getActiveTask(String taskId) + { + return activeTasks.get(taskId); } @VisibleForTesting diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java index 140d9b7ac404..152faadcb871 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java @@ -104,7 +104,14 @@ public List getTaskStatusPlusList( public Optional getTask(final String taskid) { - return storage.getTask(taskid); + // Try to fetch active task from memory + final Task activeTask = taskLockbox.getActiveTask(taskid); + if (activeTask != null) { + return Optional.of(activeTask); + } else { + // fallback to db + return storage.getTask(taskid); + } } public Optional getStatus(final String taskid) From a64d56b5dc7365a00e62ec28e0d3f34612ab1872 Mon Sep 17 00:00:00 2001 From: Amatya Date: Wed, 15 Nov 2023 13:10:32 +0530 Subject: [PATCH 2/3] Utilize in-memory task map in TaskQueue --- .../druid/indexing/overlord/TaskLockbox.java | 27 +++++++++---------- .../druid/indexing/overlord/TaskQueue.java | 19 +++++++++++++ .../overlord/TaskStorageQueryAdapter.java | 11 ++++---- .../indexing/overlord/TaskLifecycleTest.java | 5 +++- .../indexing/overlord/http/OverlordTest.java | 2 +- 5 files changed, 41 insertions(+), 23 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index 47a1110ea1c8..54191adf05df 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -99,9 +99,9 @@ public class TaskLockbox private static final EmittingLogger log = new EmittingLogger(TaskLockbox.class); - // Stores Active Tasks. TaskLockbox will only grant locks to active activeTasks. - // This should be accessed under the giant lock. - private final Map activeTasks = new HashMap(); + // Stores List of Active Tasks. TaskLockbox will only grant locks to active activeTasks. + // this set should be accessed under the giant lock. + private final Set activeTasks = new HashSet<>(); @Inject public TaskLockbox( @@ -147,7 +147,10 @@ public int compare(Pair left, Pair right) }; running.clear(); activeTasks.clear(); - storedActiveTasks.forEach(storedActiveTask -> activeTasks.put(storedActiveTask.getId(), storedActiveTask)); + activeTasks.addAll(storedActiveTasks.stream() + .map(Task::getId) + .collect(Collectors.toSet()) + ); // Set of task groups in which at least one task failed to re-acquire a lock final Set failedToReacquireLockTaskGroups = new HashSet<>(); // Bookkeeping for a log message at the end @@ -376,7 +379,7 @@ public LockResult tryLock(final Task task, final LockRequest request) giant.lock(); try { - if (!activeTasks.containsKey(task.getId())) { + if (!activeTasks.contains(task.getId())) { throw new ISE("Unable to grant lock to inactive Task [%s]", task.getId()); } Preconditions.checkArgument(request.getInterval().toDurationMillis() > 0, "interval empty"); @@ -515,7 +518,7 @@ public List allocateSegments( private void verifyTaskIsActive(SegmentAllocationHolder holder) { final String taskId = holder.task.getId(); - if (!activeTasks.containsKey(taskId)) { + if (!activeTasks.contains(taskId)) { holder.markFailed("Unable to grant lock to inactive Task [%s]", taskId); } } @@ -837,7 +840,7 @@ public void revokeLock(String taskId, TaskLock lock) giant.lock(); try { - if (!activeTasks.containsKey(taskId)) { + if (!activeTasks.contains(taskId)) { throw new ISE("Cannot revoke lock for inactive task[%s]", taskId); } @@ -1199,7 +1202,7 @@ public void add(Task task) giant.lock(); try { log.info("Adding task[%s] to activeTasks", task.getId()); - activeTasks.put(task.getId(), task); + activeTasks.add(task.getId()); } finally { giant.unlock(); @@ -1360,13 +1363,7 @@ List getOnlyTaskLockPosseContainingInterval(Task task, Interval i @VisibleForTesting Set getActiveTasks() { - return activeTasks.keySet(); - } - - @Nullable - Task getActiveTask(String taskId) - { - return activeTasks.get(taskId); + return activeTasks; } @VisibleForTesting diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index ae0708344db4..1666d5d0ae86 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -947,6 +947,25 @@ public CoordinatorRunStats getQueueStats() return stats; } + public Optional getTask(String id) + { + Task activeTask; + + giant.lock(); + try { + activeTask = tasks.get(id); + } + finally { + giant.unlock(); + } + + if (activeTask == null) { + return taskStorage.getTask(id); + } else { + return Optional.of(activeTask); + } + } + @VisibleForTesting List getTasks() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java index 152faadcb871..65d74a3de189 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java @@ -48,12 +48,14 @@ public class TaskStorageQueryAdapter { private final TaskStorage storage; private final TaskLockbox taskLockbox; + private final Optional taskQueue; @Inject - public TaskStorageQueryAdapter(TaskStorage storage, TaskLockbox taskLockbox) + public TaskStorageQueryAdapter(TaskStorage storage, TaskLockbox taskLockbox, TaskMaster taskMaster) { this.storage = storage; this.taskLockbox = taskLockbox; + this.taskQueue = taskMaster.getTaskQueue(); } public List getActiveTasks() @@ -104,12 +106,9 @@ public List getTaskStatusPlusList( public Optional getTask(final String taskid) { - // Try to fetch active task from memory - final Task activeTask = taskLockbox.getActiveTask(taskid); - if (activeTask != null) { - return Optional.of(activeTask); + if (taskQueue.isPresent()) { + return taskQueue.get().getTask(taskid); } else { - // fallback to db return storage.getTask(taskid); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 69f0039f615a..572364a56a28 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -482,7 +482,10 @@ private TaskStorage setUpTaskStorage() default: throw new RE("Unknown task storage type [%s]", taskStorageType); } - tsqa = new TaskStorageQueryAdapter(taskStorage, taskLockbox); + TaskMaster taskMaster = EasyMock.createMock(TaskMaster.class); + EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.absent()).anyTimes(); + EasyMock.replay(taskMaster); + tsqa = new TaskStorageQueryAdapter(taskStorage, taskLockbox, taskMaster); return taskStorage; } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java index 12be2ceafa61..f9ce36df1815 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java @@ -257,7 +257,7 @@ public void testOverlordRun() throws Exception Assert.assertEquals(taskMaster.getCurrentLeader(), druidNode.getHostAndPort()); Assert.assertEquals(Optional.absent(), taskMaster.getRedirectLocation()); - final TaskStorageQueryAdapter taskStorageQueryAdapter = new TaskStorageQueryAdapter(taskStorage, taskLockbox); + final TaskStorageQueryAdapter taskStorageQueryAdapter = new TaskStorageQueryAdapter(taskStorage, taskLockbox, taskMaster); final WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter = new WorkerTaskRunnerQueryAdapter(taskMaster, null); // Test Overlord resource stuff overlordResource = new OverlordResource( From 7412ed373253bf5acf2c2fbe6382b19df4f0b78b Mon Sep 17 00:00:00 2001 From: Amatya Date: Thu, 16 Nov 2023 11:05:19 +0530 Subject: [PATCH 3/3] Fetch only active tasks from TaskQueue --- .../apache/druid/indexing/overlord/TaskQueue.java | 12 ++---------- .../indexing/overlord/TaskStorageQueryAdapter.java | 8 +++++--- 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index 1666d5d0ae86..69762ba7190c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -947,23 +947,15 @@ public CoordinatorRunStats getQueueStats() return stats; } - public Optional getTask(String id) + public Optional getActiveTask(String id) { - Task activeTask; - giant.lock(); try { - activeTask = tasks.get(id); + return Optional.fromNullable(tasks.get(id)); } finally { giant.unlock(); } - - if (activeTask == null) { - return taskStorage.getTask(id); - } else { - return Optional.of(activeTask); - } } @VisibleForTesting diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java index 65d74a3de189..ba2ca3c7066a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java @@ -107,10 +107,12 @@ public List getTaskStatusPlusList( public Optional getTask(final String taskid) { if (taskQueue.isPresent()) { - return taskQueue.get().getTask(taskid); - } else { - return storage.getTask(taskid); + Optional activeTask = taskQueue.get().getActiveTask(taskid); + if (activeTask.isPresent()) { + return activeTask; + } } + return storage.getTask(taskid); } public Optional getStatus(final String taskid)