Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1401,29 +1401,31 @@ public Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyW
{
// skip the lock and bail early if we should not mark any workers lazy (e.g. number
// of current workers is at or below the minNumWorkers of autoscaler config)
if (maxLazyWorkers < 1) {
return Collections.emptyList();
if (lazyWorkers.size() >= maxLazyWorkers) {
return getLazyWorkers();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should return a sub-list here of size maxLazyWorkers. Otherwise, the strategy could potentially terminate all the lazy workers and not just those which are in excess.

This would probably be okay if the value of maxLazyWorkers passed was always the same. In that case, we would only ever mark a worker as lazy if it was in excess of maxLazyWorkers. But if the value of maxLazyWorkers decreases in subsequent invocations of this method, we would do some extra terminations.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we do want to terminate all the lazy workers, because nothing ever goes back and marks lazy workers as non-lazy. It is a permanent status unless there is an OL leader change. So, if we don't terminate them, they will just sit around not getting any work assigned to them.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I see. I guess it makes sense to kill it then.

There is however HttpRemoteTaskRunner.syncMonitoring where we reset workers which seem to be acting weird. That flow does remove items from the lazy list, to do another fresh retry of syncing to the worker. In the absence of the reset logic, the lazy flow seems a little self-fulfilling and would lead to eager terminations. If we mark a worker as lazy, and never assign it anything, it will always be lazy (going by the logic in ProvisioningUtil.createLazyWorkerPredicate(config) as an example).

Would it be better to have a time out after which we retry submitting tasks to the worker, and only after a few repeated retries, we truly mark the worker for termination?
So lazy becomes more of a temporary state and a repeatedly lazy servers finally gets black-listed.

(HttpRemoteTaskRunner also already has a list of blacklistedWorkers which seems to be doing its own thing. 😅).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yeah, okay, nothing except the reset on desync, of course 🙂. And also a worker going completely offline and then coming back— that would also get it removed from the lazy list. So, it can happen, but not during normal operation for a stable worker.

The original idea behind "lazy" workers is that they should be terminated ASAP once in the lazy state. The terminology is mostly a joke about how those workers are lazy and haven't done any work in a while. Before being marked "lazy", they would have been in the state "idle" (not running any tasks) for the workerIdleTimeout.

On blacklistedWorkers, that's a different thing, meant to catch workers that are having problems running tasks, so we don't schedule tasks on them and have them keep failing.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Thanks for the clarification and for indulging me, 🙂.

}
// status lock is used to prevent any tasks being assigned to the worker while we mark it lazy

// Search for new workers to mark lazy.
// Status lock is used to prevent any tasks being assigned to workers while we mark them lazy
synchronized (statusLock) {
for (Map.Entry<String, ZkWorker> worker : zkWorkers.entrySet()) {
if (lazyWorkers.size() >= maxLazyWorkers) {
break;
}
final ZkWorker zkWorker = worker.getValue();
try {
if (getAssignedTasks(zkWorker.getWorker()).isEmpty() && isLazyWorker.apply(zkWorker.toImmutable())) {
log.info("Adding Worker[%s] to lazySet!", zkWorker.getWorker().getHost());
lazyWorkers.put(worker.getKey(), zkWorker);
if (lazyWorkers.size() == maxLazyWorkers) {
// only mark excess workers as lazy and allow their cleanup
break;
}
}
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
return getWorkerFromZK(lazyWorkers.values());
}

return getLazyWorkers();
}

protected List<String> getAssignedTasks(Worker worker) throws Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.druid.indexing.worker.Worker;

import java.util.Collection;
import java.util.List;

@PublicApi
public interface WorkerTaskRunner extends TaskRunner
Expand All @@ -47,10 +48,16 @@ enum ActionType
Collection<Worker> getLazyWorkers();

/**
* Check which workers can be marked as lazy
* Mark workers matching a predicate as lazy, up to a maximum. If the number of workers previously marked lazy is
* equal to or higher than the provided maximum, this method will return those previously marked workers and will
* not mark any additional workers. Workers are never un-marked lazy once they are marked lazy.
*
* This method is called by {@link org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy}
* implementations. It is expected that the lazy workers returned by this method will be terminated using
* {@link org.apache.druid.indexing.overlord.autoscaling.AutoScaler#terminate(List)}.
*
* @param isLazyWorker predicate that checks if a worker is lazy
* @param maxLazyWorkers maximum number of lazy workers to return
* @param maxLazyWorkers desired maximum number of lazy workers (actual number may be higher)
*/
Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyWorker, int maxLazyWorkers);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -927,29 +926,31 @@ public Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyW
{
// skip the lock and bail early if we should not mark any workers lazy (e.g. number
// of current workers is at or below the minNumWorkers of autoscaler config)
if (maxLazyWorkers < 1) {
return Collections.emptyList();
if (lazyWorkers.size() >= maxLazyWorkers) {
return getLazyWorkers();
}

// Search for new workers to mark lazy.
// Status lock is used to prevent any tasks being assigned to workers while we mark them lazy
synchronized (statusLock) {
for (Map.Entry<String, WorkerHolder> worker : workers.entrySet()) {
if (lazyWorkers.size() >= maxLazyWorkers) {
break;
}
final WorkerHolder workerHolder = worker.getValue();
try {
if (isWorkerOkForMarkingLazy(workerHolder.getWorker()) && isLazyWorker.apply(workerHolder.toImmutable())) {
log.info("Adding Worker[%s] to lazySet!", workerHolder.getWorker().getHost());
lazyWorkers.put(worker.getKey(), workerHolder);
if (lazyWorkers.size() == maxLazyWorkers) {
// only mark excess workers as lazy and allow their cleanup
break;
}
}
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
return getLazyWorkers();
}

return getLazyWorkers();
}

private boolean isWorkerOkForMarkingLazy(Worker worker)
Expand Down