Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,12 @@ public Optional<ScalingStats> getScalingStats()
return Optional.absent();
}

@Override
public void start()
{
// No state setup required
}

@Override
public Optional<ByteSource> streamTaskLog(final String taskid, final long offset)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
Expand All @@ -42,7 +44,6 @@
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.RE;
import com.metamx.common.lifecycle.LifecycleStart;
Expand All @@ -53,7 +54,6 @@
import com.metamx.http.client.response.InputStreamResponseHandler;
import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.concurrent.Execs;
import io.druid.curator.CuratorUtils;
import io.druid.curator.cache.PathChildrenCacheFactory;
import io.druid.indexing.common.TaskStatus;
Expand Down Expand Up @@ -113,7 +113,7 @@
* <p/>
* The RemoteTaskRunner uses ZK for job management and assignment and http for IPC messages.
*/
public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
{
private static final EmittingLogger log = new EmittingLogger(RemoteTaskRunner.class);
private static final StatusResponseHandler RESPONSE_HANDLER = new StatusResponseHandler(Charsets.UTF_8);
Expand Down Expand Up @@ -153,7 +153,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
private final ListeningScheduledExecutorService cleanupExec;

private final ConcurrentMap<String, ScheduledFuture> removedWorkerCleanups = new ConcurrentHashMap<>();
private final ResourceManagementStrategy<RemoteTaskRunner> resourceManagement;
private final ResourceManagementStrategy<WorkerTaskRunner> resourceManagement;

public RemoteTaskRunner(
ObjectMapper jsonMapper,
Expand All @@ -164,7 +164,7 @@ public RemoteTaskRunner(
HttpClient httpClient,
Supplier<WorkerBehaviorConfig> workerConfigRef,
ScheduledExecutorService cleanupExec,
ResourceManagementStrategy<RemoteTaskRunner> resourceManagement
ResourceManagementStrategy<WorkerTaskRunner> resourceManagement
)
{
this.jsonMapper = jsonMapper;
Expand All @@ -180,6 +180,7 @@ public RemoteTaskRunner(
this.resourceManagement = resourceManagement;
}

@Override
@LifecycleStart
public void start()
{
Expand Down Expand Up @@ -298,6 +299,7 @@ public void onFailure(Throwable throwable)
}
}

@Override
@LifecycleStop
public void stop()
{
Expand Down Expand Up @@ -325,7 +327,13 @@ public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
return ImmutableList.of();
}

public Collection<ZkWorker> getWorkers()
@Override
public Collection<Worker> getWorkers()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@drcrallen this change made the overlord /druid/indexer/v1/workers API no longer return a runningTasks key for each worker, which may confuse some folks' rolling update scripts and definitely makes the web UI less useful.

how do you feel about having the OverlordResource specifically call getZkWorkers if the runner is an RTR? or would it make more sense to add a list of running tasks to the Worker object?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

((spoke in chat)) since io.druid.indexing.overlord.http.OverlordResource#getWorkers already special-cases the RTR, suggest making it get zk workers in that case until a more complete solution can be put in place.

{
return ImmutableList.copyOf(getWorkerFromZK(zkWorkers.values()));
}

public Collection<ZkWorker> getZkWorkers()
{
return ImmutableList.copyOf(zkWorkers.values());
}
Expand Down Expand Up @@ -1018,7 +1026,8 @@ private void taskComplete(
taskRunnerWorkItem.setResult(taskStatus);
}

public List<ZkWorker> markWorkersLazy(Predicate<ZkWorker> isLazyWorker, int maxWorkers)
@Override
public Collection<Worker> markWorkersLazy(Predicate<Worker> isLazyWorker, int maxWorkers)
{
// status lock is used to prevent any tasks being assigned to the worker while we mark it lazy
synchronized (statusLock) {
Expand All @@ -1027,7 +1036,7 @@ public List<ZkWorker> markWorkersLazy(Predicate<ZkWorker> isLazyWorker, int maxW
String worker = iterator.next();
ZkWorker zkWorker = zkWorkers.get(worker);
try {
if (getAssignedTasks(zkWorker.getWorker()).isEmpty() && isLazyWorker.apply(zkWorker)) {
if (getAssignedTasks(zkWorker.getWorker()).isEmpty() && isLazyWorker.apply(zkWorker.getWorker())) {
log.info("Adding Worker[%s] to lazySet!", zkWorker.getWorker().getHost());
lazyWorkers.put(worker, zkWorker);
if (lazyWorkers.size() == maxWorkers) {
Expand All @@ -1040,13 +1049,13 @@ public List<ZkWorker> markWorkersLazy(Predicate<ZkWorker> isLazyWorker, int maxW
throw Throwables.propagate(e);
}
}
return ImmutableList.copyOf(lazyWorkers.values());
return ImmutableList.copyOf(getWorkerFromZK(lazyWorkers.values()));
}
}

private List<String> getAssignedTasks(Worker worker) throws Exception
protected List<String> getAssignedTasks(Worker worker) throws Exception
{
List<String> assignedTasks = Lists.newArrayList(
final List<String> assignedTasks = Lists.newArrayList(
cf.getChildren().forPath(JOINER.join(indexerZkConfig.getTasksPath(), worker.getHost()))
);

Expand All @@ -1066,9 +1075,25 @@ private List<String> getAssignedTasks(Worker worker) throws Exception
return assignedTasks;
}

public List<ZkWorker> getLazyWorkers()
@Override
public Collection<Worker> getLazyWorkers()
{
return ImmutableList.copyOf(lazyWorkers.values());
return ImmutableList.copyOf(getWorkerFromZK(lazyWorkers.values()));
}

public static Collection<Worker> getWorkerFromZK(Collection<ZkWorker> workers)
{
return Collections2.transform(
workers,
new Function<ZkWorker, Worker>()
{
@Override
public Worker apply(ZkWorker input)
{
return input.getWorker();
}
}
);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
*/
public class RemoteTaskRunnerFactory implements TaskRunnerFactory<RemoteTaskRunner>
{
public static final String TYPE_NAME = "remote";
private static final Logger LOG = new Logger(RemoteTaskRunnerFactory.class);
private final CuratorFramework curator;
private final RemoteTaskRunnerConfig remoteTaskRunnerConfig;
Expand Down Expand Up @@ -83,7 +84,7 @@ public RemoteTaskRunnerFactory(
@Override
public RemoteTaskRunner build()
{
final ResourceManagementStrategy<RemoteTaskRunner> resourceManagementStrategy;
final ResourceManagementStrategy<WorkerTaskRunner> resourceManagementStrategy;
if (resourceManagementSchedulerConfig.isDoAutoscale()) {
resourceManagementStrategy = new SimpleResourceManagementStrategy(
config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

/**
* Interface for handing off tasks. Managed by a {@link io.druid.indexing.overlord.TaskQueue}.
* Holds state
*/
public interface TaskRunner
{
Expand Down Expand Up @@ -75,4 +76,9 @@ public interface TaskRunner
* @return ScalingStats if the runner has an underlying resource which can scale, Optional.absent() otherwise
*/
Optional<ScalingStats> getScalingStats();

/**
* Start the state of the runner
*/
void start();
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ private static ListeningExecutorService buildExecutorService(int priority)
);
}

@Override
@LifecycleStop
public void stop()
{
Expand Down Expand Up @@ -259,6 +260,12 @@ public Optional<ScalingStats> getScalingStats()
return Optional.absent();
}

@Override
public void start()
{
// No state startup required
}

@Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.overlord;

import com.google.common.base.Predicate;
import io.druid.indexing.worker.Worker;

import java.util.Collection;

public interface WorkerTaskRunner extends TaskRunner
{
/**
* List of known workers who can accept tasks
* @return A list of workers who can accept tasks for running
*/
Collection<Worker> getWorkers();

/**
* Return a list of workers who can be reaped by autoscaling
* @return Workers which can be reaped by autoscaling
*/
Collection<Worker> getLazyWorkers();

/**
* Check which workers can be marked as lazy
* @param isLazyWorker
* @param maxWorkers
* @return
*/
Collection<Worker> markWorkersLazy(Predicate<Worker> isLazyWorker, int maxWorkers);
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ public class ZkWorker implements Closeable
private final Function<ChildData, TaskAnnouncement> cacheConverter;

private AtomicReference<Worker> worker;
private AtomicReference<DateTime> lastCompletedTaskTime = new AtomicReference<DateTime>(new DateTime());

public ZkWorker(Worker worker, PathChildrenCache statusCache, final ObjectMapper jsonMapper)
{
Expand Down Expand Up @@ -129,7 +128,7 @@ public Set<String> getAvailabilityGroups()
@JsonProperty
public DateTime getLastCompletedTaskTime()
{
return lastCompletedTaskTime.get();
return worker.get().getLastCompletedTaskTime();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't see the variable lastCompletedTaskTime getting deleted ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Class field was leftover value that was accidentally used in toString, removed

}

public boolean isRunningTask(String taskId)
Expand All @@ -139,7 +138,7 @@ public boolean isRunningTask(String taskId)

public boolean isValidVersion(String minVersion)
{
return worker.get().getVersion().compareTo(minVersion) >= 0;
return worker.get().isValidVersion(minVersion);
}

public void setWorker(Worker newWorker)
Expand All @@ -153,7 +152,7 @@ public void setWorker(Worker newWorker)

public void setLastCompletedTaskTime(DateTime completedTaskTime)
{
lastCompletedTaskTime.set(completedTaskTime);
worker.get().setLastCompletedTaskTime(completedTaskTime);
}

public ImmutableZkWorker toImmutable()
Expand All @@ -172,7 +171,6 @@ public String toString()
{
return "ZkWorker{" +
"worker=" + worker +
", lastCompletedTaskTime=" + lastCompletedTaskTime +
'}';
}
}
Loading