diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java index 42ff3c246763..c3c00e2f0e6f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java @@ -31,7 +31,6 @@ import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.util.concurrent.FutureCallback; @@ -100,6 +99,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -464,8 +464,24 @@ public RemoteTaskRunnerConfig getConfig() @Override public Collection getKnownTasks() { - // Racey, since there is a period of time during assignment when a task is neither pending nor running - return ImmutableList.copyOf(Iterables.concat(pendingTasks.values(), runningTasks.values(), completeTasks.values())); + // Use a map to dedupe tasks, since they may transition from one state to another while this method is iterating + // through the various collections. + final Map items = new LinkedHashMap<>(); + + // Racey, since there is a period of time during assignment when a task is neither pending nor running. + for (RemoteTaskRunnerWorkItem item : pendingTasks.values()) { + items.put(item.getTaskId(), item); + } + + for (RemoteTaskRunnerWorkItem item : runningTasks.values()) { + items.put(item.getTaskId(), item); + } + + for (RemoteTaskRunnerWorkItem item : completeTasks.values()) { + items.put(item.getTaskId(), item); + } + + return ImmutableList.copyOf(items.values()); } @Nullable