From 0c937614b1ba57fb17d74c2c464a98717ec182e9 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 7 Jul 2023 03:20:50 -0700 Subject: [PATCH 1/5] More efficient generation of ImmutableWorkerHolder from WorkerHolder. Taking the work done in #12096 a little further: 1) Applying a similar optimization to WorkerHolder (HttpRemoteTaskRunner). The original patch only helped with the ZkWorker (RemoteTaskRunner). 2) Improve the ZkWorker version somewhat by avoiding multiple iterations through the task announcements map. --- .../overlord/ImmutableWorkerInfo.java | 37 ++++++++++++++- .../druid/indexing/overlord/ZkWorker.java | 9 +--- .../indexing/overlord/hrtr/WorkerHolder.java | 47 +------------------ 3 files changed, 40 insertions(+), 53 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java index aaea3f453d25..a4cdb33e0ed8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java @@ -25,11 +25,13 @@ import org.apache.druid.guice.annotations.PublicApi; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask; +import org.apache.druid.indexing.worker.TaskAnnouncement; import org.apache.druid.indexing.worker.Worker; import org.joda.time.DateTime; import javax.annotation.Nullable; import java.util.Collection; +import java.util.Map; import java.util.Set; /** @@ -76,7 +78,8 @@ public ImmutableWorkerInfo( ) { this(worker, currCapacityUsed, currParallelIndexCapacityUsed, availabilityGroups, - runningTasks, lastCompletedTaskTime, null); + runningTasks, lastCompletedTaskTime, null + ); } public ImmutableWorkerInfo( @@ -90,6 +93,38 @@ public ImmutableWorkerInfo( this(worker, currCapacityUsed, 0, availabilityGroups, runningTasks, lastCompletedTaskTime, null); } + public static ImmutableWorkerInfo fromWorkerAnnouncements( + final Worker worker, + final Map announcements, + final DateTime lastCompletedTaskTime, + @Nullable final DateTime blacklistedUntil + ) + { + int currCapacity = 0; + int currParallelIndexCapacity = 0; + ImmutableSet.Builder availabilityGroups = ImmutableSet.builder(); + + for (final TaskAnnouncement announcement : announcements.values()) { + currCapacity += announcement.getTaskResource().getRequiredCapacity(); + + if (ParallelIndexSupervisorTask.TYPE.equals(announcement.getTaskType())) { + currParallelIndexCapacity += announcement.getTaskResource().getRequiredCapacity(); + } + + availabilityGroups.add(announcement.getTaskResource().getAvailabilityGroup()); + } + + return new ImmutableWorkerInfo( + worker, + currCapacity, + currParallelIndexCapacity, + availabilityGroups.build(), + ImmutableSet.copyOf(announcements.keySet()), + lastCompletedTaskTime, + blacklistedUntil + ); + } + @JsonProperty("worker") public Worker getWorker() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ZkWorker.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ZkWorker.java index 67d30ebc8d48..dadb557e84c8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ZkWorker.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ZkWorker.java @@ -229,14 +229,9 @@ public void setBlacklistedUntil(DateTime blacklistedUntil) public ImmutableWorkerInfo toImmutable() { - Map tasks = getRunningTasks(); - - return new ImmutableWorkerInfo( + return ImmutableWorkerInfo.fromWorkerAnnouncements( worker.get(), - getCurrCapacityUsed(tasks), - getCurrParallelIndexCapacityUsed(tasks), - getAvailabilityGroups(tasks), - tasks.keySet(), + getRunningTasks(), lastCompletedTaskTime.get(), blacklistedUntil.get() ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java index a8fc53060483..df9e657556d5 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java @@ -24,7 +24,6 @@ import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.task.Task; -import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask; import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; import org.apache.druid.indexing.overlord.TaskRunnerUtils; import org.apache.druid.indexing.overlord.config.HttpRemoteTaskRunnerConfig; @@ -47,10 +46,8 @@ import java.net.URL; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; @@ -58,7 +55,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; /** */ @@ -133,42 +129,6 @@ public Worker getWorker() return worker; } - private Map getRunningTasks() - { - return tasksSnapshotRef.get().entrySet().stream().filter( - e -> e.getValue().getTaskStatus().isRunnable() - ).collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())); - } - - private int getCurrCapacityUsed() - { - int currCapacity = 0; - for (TaskAnnouncement taskAnnouncement : getRunningTasks().values()) { - currCapacity += taskAnnouncement.getTaskResource().getRequiredCapacity(); - } - return currCapacity; - } - - private int getCurrParallelIndexCapcityUsed() - { - int currParallelIndexCapacityUsed = 0; - for (TaskAnnouncement taskAnnouncement : getRunningTasks().values()) { - if (taskAnnouncement.getTaskType().equals(ParallelIndexSupervisorTask.TYPE)) { - currParallelIndexCapacityUsed += taskAnnouncement.getTaskResource().getRequiredCapacity(); - } - } - return currParallelIndexCapacityUsed; - } - - private Set getAvailabilityGroups() - { - Set retVal = new HashSet<>(); - for (TaskAnnouncement taskAnnouncement : getRunningTasks().values()) { - retVal.add(taskAnnouncement.getTaskResource().getAvailabilityGroup()); - } - return retVal; - } - public DateTime getBlacklistedUntil() { return blacklistedUntil.get(); @@ -201,12 +161,9 @@ public ImmutableWorkerInfo toImmutable() w = disabledWorker; } - return new ImmutableWorkerInfo( + return ImmutableWorkerInfo.fromWorkerAnnouncements( w, - getCurrCapacityUsed(), - getCurrParallelIndexCapcityUsed(), - getAvailabilityGroups(), - getRunningTasks().keySet(), + tasksSnapshotRef.get(), lastCompletedTaskTime.get(), blacklistedUntil.get() ); From adfcbe3aa1b7c26bbc5a9ea04fa41711a60006df Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 7 Jul 2023 05:48:49 -0700 Subject: [PATCH 2/5] Pick better names and use better logic. --- .../overlord/ImmutableWorkerInfo.java | 26 ++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java index a4cdb33e0ed8..86477910d8a0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableSet; import org.apache.druid.guice.annotations.PublicApi; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask; import org.apache.druid.indexing.worker.TaskAnnouncement; import org.apache.druid.indexing.worker.Worker; @@ -100,26 +101,33 @@ public static ImmutableWorkerInfo fromWorkerAnnouncements( @Nullable final DateTime blacklistedUntil ) { - int currCapacity = 0; - int currParallelIndexCapacity = 0; + int currCapacityUsed = 0; + int currParallelIndexCapacityUsed = 0; + ImmutableSet.Builder taskIds = ImmutableSet.builder(); ImmutableSet.Builder availabilityGroups = ImmutableSet.builder(); - for (final TaskAnnouncement announcement : announcements.values()) { - currCapacity += announcement.getTaskResource().getRequiredCapacity(); + for (final Map.Entry entry : announcements.entrySet()) { + final String taskId = entry.getKey(); + final TaskAnnouncement announcement = entry.getValue(); + final TaskResource taskResource = announcement.getTaskResource(); + final int requiredCapacity = taskResource.getRequiredCapacity(); + + currCapacityUsed += requiredCapacity; if (ParallelIndexSupervisorTask.TYPE.equals(announcement.getTaskType())) { - currParallelIndexCapacity += announcement.getTaskResource().getRequiredCapacity(); + currParallelIndexCapacityUsed += requiredCapacity; } - availabilityGroups.add(announcement.getTaskResource().getAvailabilityGroup()); + taskIds.add(taskId); + availabilityGroups.add(taskResource.getAvailabilityGroup()); } return new ImmutableWorkerInfo( worker, - currCapacity, - currParallelIndexCapacity, + currCapacityUsed, + currParallelIndexCapacityUsed, availabilityGroups.build(), - ImmutableSet.copyOf(announcements.keySet()), + taskIds.build(), lastCompletedTaskTime, blacklistedUntil ); From 893db4715b6037847caac0f3a2ad430bf406aef1 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 7 Jul 2023 05:54:03 -0700 Subject: [PATCH 3/5] Only runnable tasks. --- .../overlord/ImmutableWorkerInfo.java | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java index 86477910d8a0..829cf79ed441 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java @@ -107,19 +107,22 @@ public static ImmutableWorkerInfo fromWorkerAnnouncements( ImmutableSet.Builder availabilityGroups = ImmutableSet.builder(); for (final Map.Entry entry : announcements.entrySet()) { - final String taskId = entry.getKey(); final TaskAnnouncement announcement = entry.getValue(); - final TaskResource taskResource = announcement.getTaskResource(); - final int requiredCapacity = taskResource.getRequiredCapacity(); - currCapacityUsed += requiredCapacity; + if (announcement.getStatus().isRunnable()) { + final String taskId = entry.getKey(); + final TaskResource taskResource = announcement.getTaskResource(); + final int requiredCapacity = taskResource.getRequiredCapacity(); - if (ParallelIndexSupervisorTask.TYPE.equals(announcement.getTaskType())) { - currParallelIndexCapacityUsed += requiredCapacity; - } + currCapacityUsed += requiredCapacity; + + if (ParallelIndexSupervisorTask.TYPE.equals(announcement.getTaskType())) { + currParallelIndexCapacityUsed += requiredCapacity; + } - taskIds.add(taskId); - availabilityGroups.add(taskResource.getAvailabilityGroup()); + taskIds.add(taskId); + availabilityGroups.add(taskResource.getAvailabilityGroup()); + } } return new ImmutableWorkerInfo( From 937ad1557dd4fd711f9197b3593e6195f9843ffa Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 7 Jul 2023 13:16:52 -0700 Subject: [PATCH 4/5] Fix test. --- .../overlord/ImmutableWorkerInfo.java | 7 ++++ .../indexing/overlord/RemoteTaskRunner.java | 10 +++++ ...ngTaskBasedWorkerProvisioningStrategy.java | 5 +-- ...kRunnerRunPendingTasksConcurrencyTest.java | 4 +- .../overlord/RemoteTaskRunnerTest.java | 37 +++++++++---------- .../overlord/RemoteTaskRunnerTestUtils.java | 2 +- 6 files changed, 39 insertions(+), 26 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java index 829cf79ed441..b1eafe0dc59d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java @@ -20,6 +20,7 @@ package org.apache.druid.indexing.overlord; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableSet; import org.apache.druid.guice.annotations.PublicApi; @@ -47,6 +48,8 @@ public class ImmutableWorkerInfo private final ImmutableSet availabilityGroups; private final ImmutableSet runningTasks; private final DateTime lastCompletedTaskTime; + + @Nullable private final DateTime blacklistedUntil; @JsonCreator @@ -94,6 +97,9 @@ public ImmutableWorkerInfo( this(worker, currCapacityUsed, 0, availabilityGroups, runningTasks, lastCompletedTaskTime, null); } + /** + * Helper used by {@link ZkWorker} and {@link org.apache.druid.indexing.overlord.hrtr.WorkerHolder}. + */ public static ImmutableWorkerInfo fromWorkerAnnouncements( final Worker worker, final Map announcements, @@ -178,6 +184,7 @@ public DateTime getLastCompletedTaskTime() } @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) public DateTime getBlacklistedUntil() { return blacklistedUntil; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java index 10e66395996a..f8e27db125b7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java @@ -523,6 +523,7 @@ public Optional getScalingStats() return Optional.fromNullable(provisioningService.getStats()); } + @Nullable public ZkWorker findWorkerRunningTask(String taskId) { for (ZkWorker zkWorker : zkWorkers.values()) { @@ -533,6 +534,15 @@ public ZkWorker findWorkerRunningTask(String taskId) return null; } + /** + * Retrieve {@link ZkWorker} based on an ID (host), or null if the ID doesn't exist. + */ + @Nullable + ZkWorker findWorkerId(String workerId) + { + return zkWorkers.get(workerId); + } + public boolean isWorkerRunningTask(ZkWorker worker, String taskId) { return Preconditions.checkNotNull(worker, "worker").isRunningTask(taskId); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java index 1c9ba5f48660..6990973e564f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java @@ -49,6 +49,7 @@ import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -494,9 +495,7 @@ private static ImmutableWorkerInfo workerWithTask(ImmutableWorkerInfo immutableW ), Sets.union( immutableWorker.getRunningTasks(), - Sets.newHashSet( - task.getId() - ) + Collections.singleton(task.getId()) ), DateTimes.nowUtc() ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerRunPendingTasksConcurrencyTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerRunPendingTasksConcurrencyTest.java index 0cb978304a5b..62c18d7bd232 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerRunPendingTasksConcurrencyTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerRunPendingTasksConcurrencyTest.java @@ -126,7 +126,7 @@ public int getPendingTasksRunnerNumThreads() tasks[5] = TestTasks.unending("task5"); results[5] = remoteTaskRunner.run(tasks[5]); waitForOneWorkerToHaveUnackedTasks(); - if (rtrTestUtils.taskAnnounced("worker0", tasks[5].getId())) { + if (rtrTestUtils.taskAssigned("worker0", tasks[5].getId())) { rtrTestUtils.mockWorkerRunningTask("worker0", tasks[5]); rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker0", tasks[5]); } else { @@ -138,7 +138,7 @@ public int getPendingTasksRunnerNumThreads() private void mockWorkerRunningAndCompletionSuccessfulTasks(Task t1, Task t2) throws Exception { - if (rtrTestUtils.taskAnnounced("worker0", t1.getId())) { + if (rtrTestUtils.taskAssigned("worker0", t1.getId())) { rtrTestUtils.mockWorkerRunningTask("worker0", t1); rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker0", t1); rtrTestUtils.mockWorkerRunningTask("worker1", t2); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java index 99b6e39717fa..948681bdfe76 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java @@ -101,7 +101,8 @@ public class RemoteTaskRunnerTest private Worker worker; @Rule - public TestRule watcher = new TestWatcher() { + public TestRule watcher = new TestWatcher() + { @Override protected void starting(Description description) { @@ -621,7 +622,7 @@ private void disableWorker() throws Exception private boolean taskAnnounced(final String taskId) { - return rtrTestUtils.taskAnnounced(WORKER_HOST, taskId); + return rtrTestUtils.taskAssigned(WORKER_HOST, taskId); } private boolean workerRunningTask(final String taskId) @@ -890,8 +891,8 @@ public void testBlacklistZKWorkers() throws Exception } /** - * With 2 workers and maxPercentageBlacklistWorkers(25), neither worker should ever be blacklisted even after - * exceeding maxRetriesBeforeBlacklist. + * With 2 workers and maxPercentageBlacklistWorkers(25), no worker should be blacklisted even after exceeding + * maxRetriesBeforeBlacklist. */ @Test public void testBlacklistZKWorkers25Percent() throws Exception @@ -904,8 +905,7 @@ public void testBlacklistZKWorkers25Percent() throws Exception makeRemoteTaskRunner(rtrConfig); - String firstWorker = null; - String secondWorker = null; + String assignedWorker = null; for (int i = 1; i < 13; i++) { String taskId = StringUtils.format("rt-%d", i); @@ -920,26 +920,23 @@ public void testBlacklistZKWorkers25Percent() throws Exception Future taskFuture = remoteTaskRunner.run(task); if (i == 1) { - if (rtrTestUtils.taskAnnounced("worker2", task.getId())) { - firstWorker = "worker2"; - secondWorker = "worker"; + if (rtrTestUtils.taskAssigned("worker2", task.getId())) { + assignedWorker = "worker2"; } else { - firstWorker = "worker"; - secondWorker = "worker2"; + assignedWorker = "worker"; } } - final String expectedWorker = i % 2 == 0 ? secondWorker : firstWorker; - - Assert.assertTrue(rtrTestUtils.taskAnnounced(expectedWorker, task.getId())); - rtrTestUtils.mockWorkerRunningTask(expectedWorker, task); - rtrTestUtils.mockWorkerCompleteFailedTask(expectedWorker, task); + Assert.assertTrue(rtrTestUtils.taskAssigned(assignedWorker, task.getId())); + rtrTestUtils.mockWorkerRunningTask(assignedWorker, task); + rtrTestUtils.mockWorkerCompleteFailedTask(assignedWorker, task); Assert.assertTrue(taskFuture.get().isFailure()); Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size()); Assert.assertEquals( - ((i + 1) / 2), - remoteTaskRunner.findWorkerRunningTask(task.getId()).getContinuouslyFailedTasksCount() + i, + remoteTaskRunner.findWorkerId("worker").getContinuouslyFailedTasksCount() + + remoteTaskRunner.findWorkerId("worker2").getContinuouslyFailedTasksCount() ); } } @@ -975,7 +972,7 @@ public void testBlacklistZKWorkers50Percent() throws Exception Future taskFuture = remoteTaskRunner.run(task); if (i == 1) { - if (rtrTestUtils.taskAnnounced("worker2", task.getId())) { + if (rtrTestUtils.taskAssigned("worker2", task.getId())) { firstWorker = "worker2"; secondWorker = "worker"; } else { @@ -986,7 +983,7 @@ public void testBlacklistZKWorkers50Percent() throws Exception final String expectedWorker = i % 2 == 0 || i > 4 ? secondWorker : firstWorker; - Assert.assertTrue(rtrTestUtils.taskAnnounced(expectedWorker, task.getId())); + Assert.assertTrue(rtrTestUtils.taskAssigned(expectedWorker, task.getId())); rtrTestUtils.mockWorkerRunningTask(expectedWorker, task); rtrTestUtils.mockWorkerCompleteFailedTask(expectedWorker, task); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java index 671184414082..bdf886aa41bd 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java @@ -219,7 +219,7 @@ boolean workerRunningTask(final String workerId, final String taskId) return pathExists(JOINER.join(STATUS_PATH, workerId, taskId)); } - boolean taskAnnounced(final String workerId, final String taskId) + boolean taskAssigned(final String workerId, final String taskId) { return pathExists(JOINER.join(TASKS_PATH, workerId, taskId)); } From 06987d88f8a5a136f565246ef8d444b308656c5b Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 12 Jul 2023 09:28:32 -0700 Subject: [PATCH 5/5] Fix testBlacklistZKWorkers50Percent. --- .../overlord/RemoteTaskRunnerTest.java | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java index 948681bdfe76..a76766d2268c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java @@ -981,17 +981,26 @@ public void testBlacklistZKWorkers50Percent() throws Exception } } - final String expectedWorker = i % 2 == 0 || i > 4 ? secondWorker : firstWorker; + final String expectedWorker = i > 2 ? secondWorker : firstWorker; - Assert.assertTrue(rtrTestUtils.taskAssigned(expectedWorker, task.getId())); + Assert.assertTrue( + StringUtils.format("Task[%s] assigned to worker[%s]", i, expectedWorker), + rtrTestUtils.taskAssigned(expectedWorker, task.getId()) + ); rtrTestUtils.mockWorkerRunningTask(expectedWorker, task); rtrTestUtils.mockWorkerCompleteFailedTask(expectedWorker, task); Assert.assertTrue(taskFuture.get().isFailure()); - Assert.assertEquals(i > 2 ? 1 : 0, remoteTaskRunner.getBlackListedWorkers().size()); Assert.assertEquals( - i > 4 ? i - 2 : ((i + 1) / 2), - remoteTaskRunner.findWorkerRunningTask(task.getId()).getContinuouslyFailedTasksCount() + StringUtils.format("Blacklisted workers after task[%s]", i), + i >= 2 ? 1 : 0, + remoteTaskRunner.getBlackListedWorkers().size() + ); + Assert.assertEquals( + StringUtils.format("Continuously failed tasks after task[%s]", i), + i, + remoteTaskRunner.findWorkerId("worker").getContinuouslyFailedTasksCount() + + remoteTaskRunner.findWorkerId("worker2").getContinuouslyFailedTasksCount() ); } }