Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1397,11 +1397,11 @@ private void taskComplete(
}

@Override
public Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyWorker, int maxWorkers)
public Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyWorker, int maxLazyWorkers)
{
// skip the lock and bail early if we should not mark any workers lazy (e.g. number
// of current workers is at or below the minNumWorkers of autoscaler config)
if (maxWorkers < 1) {
if (maxLazyWorkers < 1) {
return Collections.emptyList();
}
// status lock is used to prevent any tasks being assigned to the worker while we mark it lazy
Expand All @@ -1412,7 +1412,7 @@ public Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyW
if (getAssignedTasks(zkWorker.getWorker()).isEmpty() && isLazyWorker.apply(zkWorker.toImmutable())) {
log.info("Adding Worker[%s] to lazySet!", zkWorker.getWorker().getHost());
lazyWorkers.put(worker.getKey(), zkWorker);
if (lazyWorkers.size() == maxWorkers) {
if (lazyWorkers.size() == maxLazyWorkers) {
// only mark excess workers as lazy and allow their cleanup
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,11 @@ enum ActionType

/**
* Check which workers can be marked as lazy
*
* @param isLazyWorker predicate that checks if a worker is lazy
* @param maxLazyWorkers maximum number of lazy workers to return
*/
Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyWorker, int maxWorkers);
Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyWorker, int maxLazyWorkers);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This method name is probably not accurate anymore, because it does not mark workers as lazy. It gets the list of workers which are lazy and are safe to terminate. So maybe getLazyWorkersToTerminate?

Suggested change
Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyWorker, int maxLazyWorkers);
Collection<Worker> getLazyWorkersToTerminate(Predicate<ImmutableWorkerInfo> isLazyWorker, int maxToTerminate);

Copy link
Copy Markdown
Contributor Author

@gianm gianm Jul 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree the name is funny, but I didn't want to rename it since it's an interface method. I figured for interfaces that people might be using in extensions, best to retain existing signatures unless there is an important functional reason to change them.


WorkerTaskRunnerConfig getConfig();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -922,16 +923,22 @@ public Collection<Worker> getLazyWorkers()
}

@Override
public Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyWorker, int maxWorkers)
public Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyWorker, int maxLazyWorkers)
{
// skip the lock and bail early if we should not mark any workers lazy (e.g. number
// of current workers is at or below the minNumWorkers of autoscaler config)
if (maxLazyWorkers < 1) {
return Collections.emptyList();
}

synchronized (statusLock) {
for (Map.Entry<String, WorkerHolder> worker : workers.entrySet()) {
final WorkerHolder workerHolder = worker.getValue();
try {
if (isWorkerOkForMarkingLazy(workerHolder.getWorker()) && isLazyWorker.apply(workerHolder.toImmutable())) {
log.info("Adding Worker[%s] to lazySet!", workerHolder.getWorker().getHost());
lazyWorkers.put(worker.getKey(), workerHolder);
if (lazyWorkers.size() == maxWorkers) {
if (lazyWorkers.size() == maxLazyWorkers) {
// only mark excess workers as lazy and allow their cleanup
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1105,6 +1105,17 @@ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 1, "0", Wo
Assert.assertEquals(task1.getId(), Iterables.getOnlyElement(taskRunner.getRunningTasks()).getTaskId());
Assert.assertEquals(task2.getId(), Iterables.getOnlyElement(taskRunner.getPendingTasks()).getTaskId());

Assert.assertEquals(
Collections.emptyList(),
taskRunner.markWorkersLazy(Predicates.alwaysTrue(), 0)
);

Assert.assertEquals(
"host3:8080",
Iterables.getOnlyElement(taskRunner.markWorkersLazy(Predicates.alwaysTrue(), 1))
.getHost()
);

Assert.assertEquals(
"host3:8080",
Iterables.getOnlyElement(taskRunner.markWorkersLazy(Predicates.alwaysTrue(), Integer.MAX_VALUE))
Expand Down