Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.indexing.overlord;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableSet;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.worker.Worker;
import org.joda.time.DateTime;

import java.util.Collection;
import java.util.Set;

/**
* A snapshot of a Worker and its current state i.e tasks assigned to that worker.
*/
public class ImmutableWorkerInfo
{
private final Worker worker;
private final int currCapacityUsed;
private final ImmutableSet<String> availabilityGroups;
private final ImmutableSet<String> runningTasks;
private final DateTime lastCompletedTaskTime;

@JsonCreator
public ImmutableWorkerInfo(
@JsonProperty("worker") Worker worker,
@JsonProperty("currCapacityUsed") int currCapacityUsed,
@JsonProperty("availabilityGroups") Set<String> availabilityGroups,
@JsonProperty("runningTasks") Collection<String> runningTasks,
@JsonProperty("lastCompletedTaskTime") DateTime lastCompletedTaskTime
)
{
this.worker = worker;
this.currCapacityUsed = currCapacityUsed;
this.availabilityGroups = ImmutableSet.copyOf(availabilityGroups);
this.runningTasks = ImmutableSet.copyOf(runningTasks);
this.lastCompletedTaskTime = lastCompletedTaskTime;
}

@JsonProperty("worker")
public Worker getWorker()
{
return worker;
}

@JsonProperty("currCapacityUsed")
public int getCurrCapacityUsed()
{
return currCapacityUsed;
}

@JsonProperty("availabilityGroups")
public Set<String> getAvailabilityGroups()
{
return availabilityGroups;
}

@JsonProperty("runningTasks")
public Set<String> getRunningTasks()
{
return runningTasks;
}

@JsonProperty("lastCompletedTaskTime")
public DateTime getLastCompletedTaskTime()
{
return lastCompletedTaskTime;
}

public boolean isValidVersion(String minVersion)
{
return worker.getVersion().compareTo(minVersion) >= 0;
}

public boolean canRunTask(Task task)
{
return (worker.getCapacity() - getCurrCapacityUsed() >= task.getTaskResource().getRequiredCapacity()
&& !getAvailabilityGroups().contains(task.getTaskResource().getAvailabilityGroup()));
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

ImmutableWorkerInfo that = (ImmutableWorkerInfo) o;

if (currCapacityUsed != that.currCapacityUsed) {
return false;
}
if (!worker.equals(that.worker)) {
return false;
}
if (!availabilityGroups.equals(that.availabilityGroups)) {
return false;
}
if (!runningTasks.equals(that.runningTasks)) {
return false;
}
return lastCompletedTaskTime.equals(that.lastCompletedTaskTime);

}

@Override
public int hashCode()
{
int result = worker.hashCode();
result = 31 * result + currCapacityUsed;
result = 31 * result + availabilityGroups.hashCode();
result = 31 * result + runningTasks.hashCode();
result = 31 * result + lastCompletedTaskTime.hashCode();
return result;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this have a toString too? or it doesn't matter?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added toString


@Override
public String toString()
{
return "ImmutableWorkerInfo{" +
"worker=" + worker +
", currCapacityUsed=" + currCapacityUsed +
", availabilityGroups=" + availabilityGroups +
", runningTasks=" + runningTasks +
", lastCompletedTaskTime=" + lastCompletedTaskTime +
'}';
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,13 @@
* creating ephemeral nodes in ZK that workers must remove. Workers announce the statuses of the tasks they are running.
* Once a task completes, it is up to the RTR to remove the task status and run any necessary cleanup.
* The RemoteTaskRunner is event driven and updates state according to ephemeral node changes in ZK.
* <p/>
* <p>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did these doc changes need to happen?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its done by intellij with the code formatting. so probably its fine to go along.

* The RemoteTaskRunner will assign tasks to a node until the node hits capacity. At that point, task assignment will
* fail. The RemoteTaskRunner depends on another component to create additional worker resources.
* <p/>
* <p>
* If a worker node becomes inexplicably disconnected from Zk, the RemoteTaskRunner will fail any tasks associated with the
* worker after waiting for RemoteTaskRunnerConfig.taskCleanupTimeout for the worker to show up.
* <p/>
* <p>
* The RemoteTaskRunner uses ZK for job management and assignment and http for IPC messages.
*/
public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
Expand Down Expand Up @@ -365,14 +365,9 @@ public void registerListener(TaskRunnerListener listener, Executor executor)
}

@Override
public Collection<Worker> getWorkers()
public Collection<ImmutableWorkerInfo> getWorkers()
{
return ImmutableList.copyOf(getWorkerFromZK(zkWorkers.values()));
}

public Collection<ZkWorker> getZkWorkers()
{
return ImmutableList.copyOf(zkWorkers.values());
return getImmutableWorkerFromZK(zkWorkers.values());
}

@Override
Expand Down Expand Up @@ -672,7 +667,7 @@ private boolean tryAssignTask(final Task task, final RemoteTaskRunnerWorkItem ta

ZkWorker assignedWorker = null;
try {
final Optional<ImmutableZkWorker> immutableZkWorker = strategy.findWorkerForTask(
final Optional<ImmutableWorkerInfo> immutableZkWorker = strategy.findWorkerForTask(
config,
ImmutableMap.copyOf(
Maps.transformEntries(
Expand All @@ -687,10 +682,10 @@ public boolean apply(Map.Entry<String, ZkWorker> input)
}
}
),
new Maps.EntryTransformer<String, ZkWorker, ImmutableZkWorker>()
new Maps.EntryTransformer<String, ZkWorker, ImmutableWorkerInfo>()
{
@Override
public ImmutableZkWorker transformEntry(
public ImmutableWorkerInfo transformEntry(
String key, ZkWorker value
)
{
Expand All @@ -712,7 +707,8 @@ public ImmutableZkWorker transformEntry(

log.debug("Worker nodes %s do not have capacity to run any more tasks!", zkWorkers.values());
return false;
} finally {
}
finally {
if (assignedWorker != null) {
workersWithUnacknowledgedTask.remove(assignedWorker.getWorker().getHost());
// note that this is essential as a task might not get a worker because a worker was assigned another task.
Expand Down Expand Up @@ -1092,7 +1088,7 @@ private void taskComplete(
}

@Override
public Collection<Worker> markWorkersLazy(Predicate<Worker> isLazyWorker, int maxWorkers)
public Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyWorker, int maxWorkers)
{
// status lock is used to prevent any tasks being assigned to the worker while we mark it lazy
synchronized (statusLock) {
Expand All @@ -1101,7 +1097,7 @@ public Collection<Worker> markWorkersLazy(Predicate<Worker> isLazyWorker, int ma
String worker = iterator.next();
ZkWorker zkWorker = zkWorkers.get(worker);
try {
if (getAssignedTasks(zkWorker.getWorker()).isEmpty() && isLazyWorker.apply(zkWorker.getWorker())) {
if (getAssignedTasks(zkWorker.getWorker()).isEmpty() && isLazyWorker.apply(zkWorker.toImmutable())) {
log.info("Adding Worker[%s] to lazySet!", zkWorker.getWorker().getHost());
lazyWorkers.put(worker, zkWorker);
if (lazyWorkers.size() == maxWorkers) {
Expand Down Expand Up @@ -1146,6 +1142,23 @@ public Collection<Worker> getLazyWorkers()
return ImmutableList.copyOf(getWorkerFromZK(lazyWorkers.values()));
}

private static ImmutableList<ImmutableWorkerInfo> getImmutableWorkerFromZK(Collection<ZkWorker> workers)
{
return ImmutableList.copyOf(
Collections2.transform(
workers,
new Function<ZkWorker, ImmutableWorkerInfo>()
{
@Override
public ImmutableWorkerInfo apply(ZkWorker input)
{
return input.toImmutable();
}
}
)
);
}

public static Collection<Worker> getWorkerFromZK(Collection<ZkWorker> workers)
{
return Collections2.transform(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public interface WorkerTaskRunner extends TaskRunner
* List of known workers who can accept tasks
* @return A list of workers who can accept tasks for running
*/
Collection<Worker> getWorkers();
Collection<ImmutableWorkerInfo> getWorkers();

/**
* Return a list of workers who can be reaped by autoscaling
Expand All @@ -43,5 +43,5 @@ public interface WorkerTaskRunner extends TaskRunner
* @param maxWorkers
* @return
*/
Collection<Worker> markWorkersLazy(Predicate<Worker> isLazyWorker, int maxWorkers);
Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyWorker, int maxWorkers);
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class ZkWorker implements Closeable
private final Function<ChildData, TaskAnnouncement> cacheConverter;

private AtomicReference<Worker> worker;
private AtomicReference<DateTime> lastCompletedTaskTime = new AtomicReference<DateTime>(new DateTime());

public ZkWorker(Worker worker, PathChildrenCache statusCache, final ObjectMapper jsonMapper)
{
Expand Down Expand Up @@ -128,7 +129,7 @@ public Set<String> getAvailabilityGroups()
@JsonProperty
public DateTime getLastCompletedTaskTime()
{
return worker.get().getLastCompletedTaskTime();
return lastCompletedTaskTime.get();
}

public boolean isRunningTask(String taskId)
Expand All @@ -138,7 +139,7 @@ public boolean isRunningTask(String taskId)

public boolean isValidVersion(String minVersion)
{
return worker.get().isValidVersion(minVersion);
return worker.get().getVersion().compareTo(minVersion) >= 0;
}

public void setWorker(Worker newWorker)
Expand All @@ -152,12 +153,13 @@ public void setWorker(Worker newWorker)

public void setLastCompletedTaskTime(DateTime completedTaskTime)
{
worker.get().setLastCompletedTaskTime(completedTaskTime);
lastCompletedTaskTime.set(completedTaskTime);
}

public ImmutableZkWorker toImmutable()
public ImmutableWorkerInfo toImmutable()
{
return new ImmutableZkWorker(worker.get(), getCurrCapacityUsed(), getAvailabilityGroups());

return new ImmutableWorkerInfo(worker.get(), getCurrCapacityUsed(), getAvailabilityGroups(), getRunningTaskIds(), lastCompletedTaskTime.get());
}

@Override
Expand All @@ -171,6 +173,7 @@ public String toString()
{
return "ZkWorker{" +
"worker=" + worker +
", lastCompletedTaskTime=" + lastCompletedTaskTime +
'}';
}
}
Loading