Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1229,6 +1229,11 @@ private void taskComplete(
@Override
public Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyWorker, int maxWorkers)
{
// skip the lock and bail early if we should not mark any workers lazy (e.g. number
// of current workers is at or below the minNumWorkers of autoscaler config)
if (maxWorkers < 1) {
return Collections.emptyList();
}
// status lock is used to prevent any tasks being assigned to the worker while we mark it lazy
synchronized (statusLock) {
Iterator<String> iterator = zkWorkers.keySet().iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,24 @@ public boolean apply(ImmutableWorkerInfo input)
Assert.assertEquals(1, remoteTaskRunner.getLazyWorkers().size());
}

@Test
public void testFindLazyWorkerNotRunningAnyTaskButWithZeroMaxWorkers() throws Exception
{
doSetup();
Collection<Worker> lazyworkers = remoteTaskRunner.markWorkersLazy(
new Predicate<ImmutableWorkerInfo>()
{
@Override
public boolean apply(ImmutableWorkerInfo input)
{
return true;
}
}, 0
);
Assert.assertEquals(0, lazyworkers.size());
Assert.assertEquals(0, remoteTaskRunner.getLazyWorkers().size());
}

@Test
public void testWorkerZKReconnect() throws Exception
{
Expand Down