Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.tasklogs.LogUtils;
import io.druid.indexing.overlord.autoscaling.ScalingStats;
import io.druid.indexing.overlord.config.ForkingTaskRunnerConfig;
import io.druid.indexing.worker.config.WorkerConfig;
import io.druid.query.DruidMetrics;
Expand Down Expand Up @@ -542,9 +543,9 @@ public Collection<TaskRunnerWorkItem> getKnownTasks()
}

@Override
public Collection<ZkWorker> getWorkers()
public Optional<ScalingStats> getScalingStats()
{
return ImmutableList.of();
return Optional.absent();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import java.util.Properties;

/**
*/
public class ForkingTaskRunnerFactory implements TaskRunnerFactory
*/
public class ForkingTaskRunnerFactory implements TaskRunnerFactory<ForkingTaskRunner>
{
private final ForkingTaskRunnerConfig config;
private final TaskConfig taskConfig;
Expand All @@ -51,7 +51,8 @@ public ForkingTaskRunnerFactory(
final ObjectMapper jsonMapper,
final TaskLogPusher persistentTaskLogs,
@Self DruidNode node
) {
)
{
this.config = config;
this.taskConfig = taskConfig;
this.workerConfig = workerConfig;
Expand All @@ -62,7 +63,7 @@ public ForkingTaskRunnerFactory(
}

@Override
public TaskRunner build()
public ForkingTaskRunner build()
{
return new ForkingTaskRunner(config, taskConfig, workerConfig, props, persistentTaskLogs, jsonMapper, node);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,12 @@
import com.metamx.http.client.response.InputStreamResponseHandler;
import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.concurrent.Execs;
import io.druid.curator.cache.PathChildrenCacheFactory;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.autoscaling.ResourceManagementStrategy;
import io.druid.indexing.overlord.autoscaling.ScalingStats;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.indexing.overlord.setup.WorkerSelectStrategy;
Expand Down Expand Up @@ -102,7 +105,6 @@
* <p/>
* The RemoteTaskRunner will assign tasks to a node until the node hits capacity. At that point, task assignment will
* fail. The RemoteTaskRunner depends on another component to create additional worker resources.
* For example, {@link io.druid.indexing.overlord.autoscaling.ResourceManagementScheduler} can take care of these duties.
* <p/>
* If a worker node becomes inexplicably disconnected from Zk, the RemoteTaskRunner will fail any tasks associated with the
* worker after waiting for RemoteTaskRunnerConfig.taskCleanupTimeout for the worker to show up.
Expand Down Expand Up @@ -149,6 +151,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
private final ListeningScheduledExecutorService cleanupExec;

private final ConcurrentMap<String, ScheduledFuture> removedWorkerCleanups = new ConcurrentHashMap<>();
private final ResourceManagementStrategy<RemoteTaskRunner> resourceManagement;

public RemoteTaskRunner(
ObjectMapper jsonMapper,
Expand All @@ -158,7 +161,8 @@ public RemoteTaskRunner(
PathChildrenCacheFactory pathChildrenCacheFactory,
HttpClient httpClient,
Supplier<WorkerBehaviorConfig> workerConfigRef,
ScheduledExecutorService cleanupExec
ScheduledExecutorService cleanupExec,
ResourceManagementStrategy<RemoteTaskRunner> resourceManagement
)
{
this.jsonMapper = jsonMapper;
Expand All @@ -171,6 +175,7 @@ public RemoteTaskRunner(
this.httpClient = httpClient;
this.workerConfigRef = workerConfigRef;
this.cleanupExec = MoreExecutors.listeningDecorator(cleanupExec);
this.resourceManagement = resourceManagement;
}

@LifecycleStart
Expand Down Expand Up @@ -283,6 +288,7 @@ public void onFailure(Throwable throwable)
waitingForMonitor.wait();
}
}
resourceManagement.startManagement(this);
started = true;
}
catch (Exception e) {
Expand All @@ -298,6 +304,9 @@ public void stop()
return;
}
started = false;

resourceManagement.stopManagement();

for (ZkWorker zkWorker : zkWorkers.values()) {
zkWorker.close();
}
Expand All @@ -314,7 +323,6 @@ public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
return ImmutableList.of();
}

@Override
public Collection<ZkWorker> getWorkers()
{
return ImmutableList.copyOf(zkWorkers.values());
Expand All @@ -339,6 +347,12 @@ public Collection<RemoteTaskRunnerWorkItem> getKnownTasks()
return ImmutableList.copyOf(Iterables.concat(pendingTasks.values(), runningTasks.values(), completeTasks.values()));
}

@Override
public Optional<ScalingStats> getScalingStats()
{
return Optional.of(resourceManagement.getStats());
}

public ZkWorker findWorkerRunningTask(String taskId)
{
for (ZkWorker zkWorker : zkWorkers.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,16 @@
import com.google.common.base.Supplier;
import com.google.inject.Inject;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.logger.Logger;
import com.metamx.http.client.HttpClient;
import io.druid.curator.cache.SimplePathChildrenCacheFactory;
import io.druid.guice.annotations.Global;
import io.druid.indexing.overlord.autoscaling.NoopResourceManagementStrategy;
import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerConfig;
import io.druid.indexing.overlord.autoscaling.ResourceManagementStrategy;
import io.druid.indexing.overlord.autoscaling.ScalingStats;
import io.druid.indexing.overlord.autoscaling.SimpleResourceManagementConfig;
import io.druid.indexing.overlord.autoscaling.SimpleResourceManagementStrategy;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.server.initialization.IndexerZkConfig;
Expand All @@ -35,15 +42,19 @@

/**
*/
public class RemoteTaskRunnerFactory implements TaskRunnerFactory
public class RemoteTaskRunnerFactory implements TaskRunnerFactory<RemoteTaskRunner>
{
private static final Logger LOG = new Logger(RemoteTaskRunnerFactory.class);
private final CuratorFramework curator;
private final RemoteTaskRunnerConfig remoteTaskRunnerConfig;
private final IndexerZkConfig zkPaths;
private final ObjectMapper jsonMapper;
private final HttpClient httpClient;
private final Supplier<WorkerBehaviorConfig> workerConfigRef;
private final ScheduledExecutorService cleanupExec;
private final SimpleResourceManagementConfig config;
private final ResourceManagementSchedulerConfig resourceManagementSchedulerConfig;
private final ScheduledExecutorService exec;

@Inject
public RemoteTaskRunnerFactory(
Expand All @@ -53,7 +64,10 @@ public RemoteTaskRunnerFactory(
final ObjectMapper jsonMapper,
@Global final HttpClient httpClient,
final Supplier<WorkerBehaviorConfig> workerConfigRef,
ScheduledExecutorFactory factory
final ScheduledExecutorFactory factory,
final SimpleResourceManagementConfig config,
final ResourceManagementSchedulerConfig resourceManagementSchedulerConfig,
final ScheduledExecutorService exec
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@drcrallen are you able to start overlord after this change, guice does not seem to be able to inject "ScheduledExecutorService" .

)
{
this.curator = curator;
Expand All @@ -62,12 +76,26 @@ public RemoteTaskRunnerFactory(
this.jsonMapper = jsonMapper;
this.httpClient = httpClient;
this.workerConfigRef = workerConfigRef;
this.cleanupExec = factory.create(1,"RemoteTaskRunner-Scheduled-Cleanup--%d");
this.cleanupExec = factory.create(1, "RemoteTaskRunner-Scheduled-Cleanup--%d");
this.config = config;
this.resourceManagementSchedulerConfig = resourceManagementSchedulerConfig;
this.exec = exec;
}

@Override
public TaskRunner build()
public RemoteTaskRunner build()
{
final ResourceManagementStrategy<RemoteTaskRunner> resourceManagementStrategy;
if (resourceManagementSchedulerConfig.isDoAutoscale()) {
resourceManagementStrategy = new SimpleResourceManagementStrategy(
config,
workerConfigRef,
resourceManagementSchedulerConfig,
exec
);
} else {
resourceManagementStrategy = new NoopResourceManagementStrategy<>();
}
return new RemoteTaskRunner(
jsonMapper,
remoteTaskRunnerConfig,
Expand All @@ -79,7 +107,8 @@ public TaskRunner build()
.build(),
httpClient,
workerConfigRef,
cleanupExec
cleanupExec,
resourceManagementStrategy
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
Expand All @@ -34,8 +32,7 @@
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.autoscaling.ResourceManagementScheduler;
import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerFactory;
import io.druid.indexing.overlord.autoscaling.ScalingStats;
import io.druid.indexing.overlord.config.TaskQueueConfig;
import io.druid.server.DruidNode;
import io.druid.server.initialization.IndexerZkConfig;
Expand Down Expand Up @@ -64,7 +61,6 @@ public class TaskMaster
private volatile boolean leading = false;
private volatile TaskRunner taskRunner;
private volatile TaskQueue taskQueue;
private volatile ResourceManagementScheduler resourceManagementScheduler;

private static final EmittingLogger log = new EmittingLogger(TaskMaster.class);

Expand All @@ -77,7 +73,6 @@ public TaskMaster(
@Self final DruidNode node,
final IndexerZkConfig zkPaths,
final TaskRunnerFactory runnerFactory,
final ResourceManagementSchedulerFactory managementSchedulerFactory,
final CuratorFramework curator,
final ServiceAnnouncer serviceAnnouncer,
final ServiceEmitter emitter
Expand Down Expand Up @@ -118,14 +113,6 @@ public void takeLeadership(CuratorFramework client) throws Exception
.emit();
}
leaderLifecycle.addManagedInstance(taskRunner);
if (taskRunner instanceof RemoteTaskRunner) {
final ScheduledExecutorFactory executorFactory = ScheduledExecutors.createFactory(leaderLifecycle);
resourceManagementScheduler = managementSchedulerFactory.build(
(RemoteTaskRunner) taskRunner,
executorFactory
);
leaderLifecycle.addManagedInstance(resourceManagementScheduler);
}
leaderLifecycle.addManagedInstance(taskQueue);
leaderLifecycle.addHandler(
new Lifecycle.Handler()
Expand Down Expand Up @@ -285,10 +272,10 @@ public Optional<TaskActionClient> getTaskActionClient(Task task)
}
}

public Optional<ResourceManagementScheduler> getResourceManagementScheduler()
public Optional<ScalingStats> getScalingStats()
{
if (leading) {
return Optional.fromNullable(resourceManagementScheduler);
return taskRunner.getScalingStats();
} else {
return Optional.absent();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

package io.druid.indexing.overlord;

import com.google.common.base.Optional;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.Pair;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.autoscaling.ScalingStats;

import java.util.Collection;
import java.util.List;
Expand All @@ -45,27 +47,32 @@ public interface TaskRunner
*
* @return task status, eventually
*/
public ListenableFuture<TaskStatus> run(Task task);
ListenableFuture<TaskStatus> run(Task task);

/**
* Inform the task runner it can clean up any resources associated with a task. This implies shutdown of any
* currently-running tasks.
*
* @param taskid task ID to clean up resources for
*/
public void shutdown(String taskid);
void shutdown(String taskid);

/**
* Stop this task runner. This may block until currently-running tasks can be gracefully stopped. After calling
* stopping, "run" will not accept further tasks.
*/
public void stop();
void stop();

public Collection<? extends TaskRunnerWorkItem> getRunningTasks();
Collection<? extends TaskRunnerWorkItem> getRunningTasks();

public Collection<? extends TaskRunnerWorkItem> getPendingTasks();
Collection<? extends TaskRunnerWorkItem> getPendingTasks();

public Collection<? extends TaskRunnerWorkItem> getKnownTasks();
Collection<? extends TaskRunnerWorkItem> getKnownTasks();

public Collection<ZkWorker> getWorkers();
/**
* Some runners are able to scale up and down their capacity in a dynamic manner. This returns stats on those activities
*
* @return ScalingStats if the runner has an underlying resource which can scale, Optional.absent() otherwise
*/
Optional<ScalingStats> getScalingStats();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar to getWorkers() , getScalingStats() is also tied to some specific TaskRunners and ScalingStat looks like related much more to autoscaling . not sure if we are introducing same kind of coupling again via this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My issue here is how to handle io.druid.indexing.overlord.http.OverlordResource#getScalingState

This is intended to be a baby step

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package io.druid.indexing.overlord;

public interface TaskRunnerFactory
public interface TaskRunnerFactory<T extends TaskRunner>
{
public TaskRunner build();
T build();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@

package io.druid.indexing.overlord;

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
Expand All @@ -33,7 +33,6 @@
import com.metamx.common.Pair;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.AlertEvent;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.concurrent.Execs;
Expand All @@ -42,6 +41,7 @@
import io.druid.indexing.common.TaskToolboxFactory;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.autoscaling.ScalingStats;
import io.druid.query.NoopQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
Expand Down Expand Up @@ -204,9 +204,9 @@ public Collection<TaskRunnerWorkItem> getKnownTasks()
}

@Override
public Collection<ZkWorker> getWorkers()
public Optional<ScalingStats> getScalingStats()
{
return Lists.newArrayList();
return Optional.absent();
}

@Override
Expand Down
Loading