Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,20 @@
package org.apache.druid.indexing.overlord;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import org.apache.druid.indexing.worker.TaskAnnouncement;
import org.apache.druid.indexing.worker.Worker;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
import java.util.Collection;
import java.util.Map;
import java.util.Set;

/**
Expand All @@ -44,6 +48,8 @@ public class ImmutableWorkerInfo
private final ImmutableSet<String> availabilityGroups;
private final ImmutableSet<String> runningTasks;
private final DateTime lastCompletedTaskTime;

@Nullable
private final DateTime blacklistedUntil;

@JsonCreator
Expand Down Expand Up @@ -76,7 +82,8 @@ public ImmutableWorkerInfo(
)
{
this(worker, currCapacityUsed, currParallelIndexCapacityUsed, availabilityGroups,
runningTasks, lastCompletedTaskTime, null);
runningTasks, lastCompletedTaskTime, null
);
}

public ImmutableWorkerInfo(
Expand All @@ -90,6 +97,51 @@ public ImmutableWorkerInfo(
this(worker, currCapacityUsed, 0, availabilityGroups, runningTasks, lastCompletedTaskTime, null);
}

/**
* Helper used by {@link ZkWorker} and {@link org.apache.druid.indexing.overlord.hrtr.WorkerHolder}.
*/
public static ImmutableWorkerInfo fromWorkerAnnouncements(
final Worker worker,
final Map<String, TaskAnnouncement> announcements,
final DateTime lastCompletedTaskTime,
@Nullable final DateTime blacklistedUntil
)
{
int currCapacityUsed = 0;
int currParallelIndexCapacityUsed = 0;
ImmutableSet.Builder<String> taskIds = ImmutableSet.builder();
ImmutableSet.Builder<String> availabilityGroups = ImmutableSet.builder();

for (final Map.Entry<String, TaskAnnouncement> entry : announcements.entrySet()) {
final TaskAnnouncement announcement = entry.getValue();

if (announcement.getStatus().isRunnable()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Query: WorkerHolder was already doing this filtering but ZkWorkerHolder seems to be using all the announcements (unless the filtering happens there at some prior stage). Was it performing incorrect computations of capacity and the other parameters?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I didn't notice this behavioral difference til now. I think the filtering on isRunnable is the correct thing to do: tasks don't use slots when they aren't running, so they shouldn't be counted.

With this change, some RemoteTaskRunnerTest cases need to be adjusted, but I think that's OK. Note that in most cases, tasks get removed from the announcement set pretty quickly after they complete, so I don't expect a big practical difference in prod.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, filtering here makes sense.

final String taskId = entry.getKey();
final TaskResource taskResource = announcement.getTaskResource();
final int requiredCapacity = taskResource.getRequiredCapacity();

currCapacityUsed += requiredCapacity;

if (ParallelIndexSupervisorTask.TYPE.equals(announcement.getTaskType())) {
currParallelIndexCapacityUsed += requiredCapacity;
}

taskIds.add(taskId);
availabilityGroups.add(taskResource.getAvailabilityGroup());
}
}

return new ImmutableWorkerInfo(
worker,
currCapacityUsed,
currParallelIndexCapacityUsed,
availabilityGroups.build(),
taskIds.build(),
lastCompletedTaskTime,
blacklistedUntil
);
}

@JsonProperty("worker")
public Worker getWorker()
{
Expand Down Expand Up @@ -132,6 +184,7 @@ public DateTime getLastCompletedTaskTime()
}

@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public DateTime getBlacklistedUntil()
{
return blacklistedUntil;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,7 @@ public Optional<ScalingStats> getScalingStats()
return Optional.fromNullable(provisioningService.getStats());
}

@Nullable
public ZkWorker findWorkerRunningTask(String taskId)
{
for (ZkWorker zkWorker : zkWorkers.values()) {
Expand All @@ -533,6 +534,15 @@ public ZkWorker findWorkerRunningTask(String taskId)
return null;
}

/**
* Retrieve {@link ZkWorker} based on an ID (host), or null if the ID doesn't exist.
*/
@Nullable
ZkWorker findWorkerId(String workerId)
{
return zkWorkers.get(workerId);
}

public boolean isWorkerRunningTask(ZkWorker worker, String taskId)
{
return Preconditions.checkNotNull(worker, "worker").isRunningTask(taskId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,14 +229,9 @@ public void setBlacklistedUntil(DateTime blacklistedUntil)

public ImmutableWorkerInfo toImmutable()
{
Map<String, TaskAnnouncement> tasks = getRunningTasks();

return new ImmutableWorkerInfo(
return ImmutableWorkerInfo.fromWorkerAnnouncements(
worker.get(),
getCurrCapacityUsed(tasks),
getCurrParallelIndexCapacityUsed(tasks),
getAvailabilityGroups(tasks),
tasks.keySet(),
getRunningTasks(),
lastCompletedTaskTime.get(),
blacklistedUntil.get()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -494,9 +495,7 @@ private static ImmutableWorkerInfo workerWithTask(ImmutableWorkerInfo immutableW
),
Sets.union(
immutableWorker.getRunningTasks(),
Sets.newHashSet(
task.getId()
)
Collections.singleton(task.getId())
),
DateTimes.nowUtc()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.TaskRunnerUtils;
import org.apache.druid.indexing.overlord.config.HttpRemoteTaskRunnerConfig;
Expand All @@ -47,18 +46,15 @@

import java.net.URL;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

/**
*/
Expand Down Expand Up @@ -133,42 +129,6 @@ public Worker getWorker()
return worker;
}

private Map<String, TaskAnnouncement> getRunningTasks()
{
return tasksSnapshotRef.get().entrySet().stream().filter(
e -> e.getValue().getTaskStatus().isRunnable()
).collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
}

private int getCurrCapacityUsed()
{
int currCapacity = 0;
for (TaskAnnouncement taskAnnouncement : getRunningTasks().values()) {
currCapacity += taskAnnouncement.getTaskResource().getRequiredCapacity();
}
return currCapacity;
}

private int getCurrParallelIndexCapcityUsed()
{
int currParallelIndexCapacityUsed = 0;
for (TaskAnnouncement taskAnnouncement : getRunningTasks().values()) {
if (taskAnnouncement.getTaskType().equals(ParallelIndexSupervisorTask.TYPE)) {
currParallelIndexCapacityUsed += taskAnnouncement.getTaskResource().getRequiredCapacity();
}
}
return currParallelIndexCapacityUsed;
}

private Set<String> getAvailabilityGroups()
{
Set<String> retVal = new HashSet<>();
for (TaskAnnouncement taskAnnouncement : getRunningTasks().values()) {
retVal.add(taskAnnouncement.getTaskResource().getAvailabilityGroup());
}
return retVal;
}

public DateTime getBlacklistedUntil()
{
return blacklistedUntil.get();
Expand Down Expand Up @@ -201,12 +161,9 @@ public ImmutableWorkerInfo toImmutable()
w = disabledWorker;
}

return new ImmutableWorkerInfo(
return ImmutableWorkerInfo.fromWorkerAnnouncements(
w,
getCurrCapacityUsed(),
getCurrParallelIndexCapcityUsed(),
getAvailabilityGroups(),
getRunningTasks().keySet(),
tasksSnapshotRef.get(),
lastCompletedTaskTime.get(),
blacklistedUntil.get()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public int getPendingTasksRunnerNumThreads()
tasks[5] = TestTasks.unending("task5");
results[5] = remoteTaskRunner.run(tasks[5]);
waitForOneWorkerToHaveUnackedTasks();
if (rtrTestUtils.taskAnnounced("worker0", tasks[5].getId())) {
if (rtrTestUtils.taskAssigned("worker0", tasks[5].getId())) {
rtrTestUtils.mockWorkerRunningTask("worker0", tasks[5]);
rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker0", tasks[5]);
} else {
Expand All @@ -138,7 +138,7 @@ public int getPendingTasksRunnerNumThreads()

private void mockWorkerRunningAndCompletionSuccessfulTasks(Task t1, Task t2) throws Exception
{
if (rtrTestUtils.taskAnnounced("worker0", t1.getId())) {
if (rtrTestUtils.taskAssigned("worker0", t1.getId())) {
rtrTestUtils.mockWorkerRunningTask("worker0", t1);
rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker0", t1);
rtrTestUtils.mockWorkerRunningTask("worker1", t2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ public class RemoteTaskRunnerTest
private Worker worker;

@Rule
public TestRule watcher = new TestWatcher() {
public TestRule watcher = new TestWatcher()
{
@Override
protected void starting(Description description)
{
Expand Down Expand Up @@ -621,7 +622,7 @@ private void disableWorker() throws Exception

private boolean taskAnnounced(final String taskId)
{
return rtrTestUtils.taskAnnounced(WORKER_HOST, taskId);
return rtrTestUtils.taskAssigned(WORKER_HOST, taskId);
}

private boolean workerRunningTask(final String taskId)
Expand Down Expand Up @@ -890,8 +891,8 @@ public void testBlacklistZKWorkers() throws Exception
}

/**
* With 2 workers and maxPercentageBlacklistWorkers(25), neither worker should ever be blacklisted even after
* exceeding maxRetriesBeforeBlacklist.
* With 2 workers and maxPercentageBlacklistWorkers(25), no worker should be blacklisted even after exceeding
* maxRetriesBeforeBlacklist.
*/
@Test
public void testBlacklistZKWorkers25Percent() throws Exception
Expand All @@ -904,8 +905,7 @@ public void testBlacklistZKWorkers25Percent() throws Exception

makeRemoteTaskRunner(rtrConfig);

String firstWorker = null;
String secondWorker = null;
String assignedWorker = null;

for (int i = 1; i < 13; i++) {
String taskId = StringUtils.format("rt-%d", i);
Expand All @@ -920,26 +920,23 @@ public void testBlacklistZKWorkers25Percent() throws Exception
Future<TaskStatus> taskFuture = remoteTaskRunner.run(task);

if (i == 1) {
if (rtrTestUtils.taskAnnounced("worker2", task.getId())) {
firstWorker = "worker2";
secondWorker = "worker";
if (rtrTestUtils.taskAssigned("worker2", task.getId())) {
assignedWorker = "worker2";
} else {
firstWorker = "worker";
secondWorker = "worker2";
assignedWorker = "worker";
}
}

final String expectedWorker = i % 2 == 0 ? secondWorker : firstWorker;

Assert.assertTrue(rtrTestUtils.taskAnnounced(expectedWorker, task.getId()));
rtrTestUtils.mockWorkerRunningTask(expectedWorker, task);
rtrTestUtils.mockWorkerCompleteFailedTask(expectedWorker, task);
Assert.assertTrue(rtrTestUtils.taskAssigned(assignedWorker, task.getId()));
rtrTestUtils.mockWorkerRunningTask(assignedWorker, task);
rtrTestUtils.mockWorkerCompleteFailedTask(assignedWorker, task);

Assert.assertTrue(taskFuture.get().isFailure());
Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size());
Assert.assertEquals(
((i + 1) / 2),
remoteTaskRunner.findWorkerRunningTask(task.getId()).getContinuouslyFailedTasksCount()
i,
remoteTaskRunner.findWorkerId("worker").getContinuouslyFailedTasksCount()
+ remoteTaskRunner.findWorkerId("worker2").getContinuouslyFailedTasksCount()
);
}
}
Expand Down Expand Up @@ -975,7 +972,7 @@ public void testBlacklistZKWorkers50Percent() throws Exception
Future<TaskStatus> taskFuture = remoteTaskRunner.run(task);

if (i == 1) {
if (rtrTestUtils.taskAnnounced("worker2", task.getId())) {
if (rtrTestUtils.taskAssigned("worker2", task.getId())) {
firstWorker = "worker2";
secondWorker = "worker";
} else {
Expand All @@ -984,17 +981,26 @@ public void testBlacklistZKWorkers50Percent() throws Exception
}
}

final String expectedWorker = i % 2 == 0 || i > 4 ? secondWorker : firstWorker;
final String expectedWorker = i > 2 ? secondWorker : firstWorker;

Assert.assertTrue(rtrTestUtils.taskAnnounced(expectedWorker, task.getId()));
Assert.assertTrue(
StringUtils.format("Task[%s] assigned to worker[%s]", i, expectedWorker),
rtrTestUtils.taskAssigned(expectedWorker, task.getId())
);
rtrTestUtils.mockWorkerRunningTask(expectedWorker, task);
rtrTestUtils.mockWorkerCompleteFailedTask(expectedWorker, task);

Assert.assertTrue(taskFuture.get().isFailure());
Assert.assertEquals(i > 2 ? 1 : 0, remoteTaskRunner.getBlackListedWorkers().size());
Assert.assertEquals(
i > 4 ? i - 2 : ((i + 1) / 2),
remoteTaskRunner.findWorkerRunningTask(task.getId()).getContinuouslyFailedTasksCount()
StringUtils.format("Blacklisted workers after task[%s]", i),
i >= 2 ? 1 : 0,
remoteTaskRunner.getBlackListedWorkers().size()
);
Assert.assertEquals(
StringUtils.format("Continuously failed tasks after task[%s]", i),
i,
remoteTaskRunner.findWorkerId("worker").getContinuouslyFailedTasksCount()
+ remoteTaskRunner.findWorkerId("worker2").getContinuouslyFailedTasksCount()
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ boolean workerRunningTask(final String workerId, final String taskId)
return pathExists(JOINER.join(STATUS_PATH, workerId, taskId));
}

boolean taskAnnounced(final String workerId, final String taskId)
boolean taskAssigned(final String workerId, final String taskId)
{
return pathExists(JOINER.join(TASKS_PATH, workerId, taskId));
}
Expand Down