Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.IndexTask;
Expand Down Expand Up @@ -233,6 +234,14 @@ static boolean isReady(TaskActionClient actionClient, SortedSet<Interval> interv
return true;
}

@Override
public void stopGracefully(TaskConfig taskConfig)
{
if (runner != null) {
runner.stopGracefully();
}
}

@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ public interface ParallelIndexTaskRunner<T extends Task>
*/
TaskState run() throws Exception;

/**
* Stop this runner gracefully. This method is called when the task is killed.
* See {@link org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner#stop}.
*/
void stopGracefully();

/**
* {@link PushedSegmentsReport} is the report sent by {@link ParallelIndexSubTask}s. The subTasks call this method to
* send their reports after pushing generated segments to deep storage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public class SinglePhaseParallelIndexTaskRunner implements ParallelIndexTaskRunn
/** subTaskId -> report */
private final ConcurrentHashMap<String, PushedSegmentsReport> segmentsMap = new ConcurrentHashMap<>();

private volatile boolean stopped;
private volatile boolean subTaskScheduleAndMonitorStopped;
private volatile TaskMonitor<ParallelIndexSubTask> taskMonitor;

private int nextSpecId = 0;
Expand All @@ -111,6 +111,11 @@ public class SinglePhaseParallelIndexTaskRunner implements ParallelIndexTaskRunn
@Override
public TaskState run() throws Exception
{
if (baseFirehoseFactory.getNumSplits() == 0) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering what if getNumSplits() returns 1

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's 1, the native parallel task currently does the same thing: the supervisor task will run a sub task for the single split. Do you think it's better to process in the supervisor task rather than processing in the sub task?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may be i was thinking in that direction, but I guess it's okay to run a sub task for single split.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I think it makes sense, but probably better to fix in a separate PR.

log.warn("There's no input split to process");
return TaskState.SUCCESS;
}

final Iterator<ParallelIndexSubTaskSpec> subTaskSpecIterator = subTaskSpecIterator().iterator();
final long taskStatusCheckingPeriod = ingestionSchema.getTuningConfig().getTaskStatusCheckPeriodMs();

Expand Down Expand Up @@ -153,7 +158,7 @@ public TaskState run() throws Exception
if (!subTaskSpecIterator.hasNext()) {
// We have no more subTasks to run
if (taskMonitor.getNumRunningTasks() == 0 && taskCompleteEvents.size() == 0) {
stopped = true;
subTaskScheduleAndMonitorStopped = true;
if (taskMonitor.isSucceeded()) {
// Publishing all segments reported so far
publish(toolbox);
Expand Down Expand Up @@ -182,7 +187,7 @@ public TaskState run() throws Exception
case FAILED:
// TaskMonitor already tried everything it can do for failed tasks. We failed.
state = TaskState.FAILED;
stopped = true;
subTaskScheduleAndMonitorStopped = true;
final TaskStatusPlus lastStatus = taskCompleteEvent.getLastStatus();
if (lastStatus != null) {
log.error("Failed because of the failed sub task[%s]", lastStatus.getId());
Expand All @@ -202,30 +207,39 @@ public TaskState run() throws Exception
}
}
finally {
log.info("Cleaning up resources");
// Cleanup resources
taskCompleteEvents.clear();
taskMonitor.stop();

if (state != TaskState.SUCCESS) {
log.info(
"This task is finished with [%s] state. Killing [%d] remaining subtasks.",
state,
taskMonitor.getNumRunningTasks()
);
// if this fails, kill all sub tasks
// Note: this doesn't work when this task is killed by users. We need a way for gracefully shutting down tasks
// for resource cleanup.
taskMonitor.killAll();
stopInternal();
if (!state.isComplete()) {
state = TaskState.FAILED;
}
}

return state;
}

@Override
public void stopGracefully()
{
subTaskScheduleAndMonitorStopped = true;
stopInternal();
}

/**
* Stop task scheduling and monitoring, and kill all running tasks.
* This method is thread-safe.
*/
private void stopInternal()
{
log.info("Cleaning up resources");

taskCompleteEvents.clear();
if (taskMonitor != null) {
taskMonitor.stop();
}
}

private boolean isRunning()
{
return !stopped && !Thread.currentThread().isInterrupted();
return !subTaskScheduleAndMonitorStopped && !Thread.currentThread().isInterrupted();
}

@VisibleForTesting
Expand All @@ -240,6 +254,13 @@ ParallelIndexIngestionSpec getIngestionSchema()
return ingestionSchema;
}

@VisibleForTesting
@Nullable
TaskMonitor<ParallelIndexSubTask> getTaskMonitor()
{
return taskMonitor;
}

@Override
public void collectReport(PushedSegmentsReport report)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.indexing.common.task.batch.parallel;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
Expand Down Expand Up @@ -84,6 +85,11 @@ public class TaskMonitor<T extends Task>
private int numRunningTasks;
private int numSucceededTasks;
private int numFailedTasks;
// This metric is used only for unit tests because the current taskStatus system doesn't track the killed task status.
// Currently, this metric only represents # of killed tasks by ParallelIndexTaskRunner.
// See killAllRunningTasks(), SinglePhaseParallelIndexTaskRunner.run(), and
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was no method killAllRunningTasks() anywhere in the repo when this PR was checked in.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, thanks. Will fix it soon.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@leventov fixed in #8924.

// SinglePhaseParallelIndexTaskRunner.stopGracefully()
private int numKilledTasks;

private boolean running = false;

Expand Down Expand Up @@ -169,11 +175,35 @@ public void start(long taskStatusCheckingPeriod)
}
}

/**
* Stop task monitoring and kill all running tasks.
*/
public void stop()
{
synchronized (startStopLock) {
running = false;
taskStatusChecker.shutdownNow();

if (numRunningTasks > 0) {
final Iterator<MonitorEntry> iterator = runningTasks.values().iterator();
while (iterator.hasNext()) {
final MonitorEntry entry = iterator.next();
iterator.remove();
final String taskId = entry.runningTask.getId();
log.info("Request to kill subtask[%s]", taskId);
indexingServiceClient.killTask(taskId);
numRunningTasks--;
numKilledTasks++;
}

if (numRunningTasks > 0) {
log.warn(
"Inconsistent state: numRunningTasks[%d] is still not zero after trying to kill all running tasks.",
numRunningTasks
);
}
}

log.info("Stopped taskMonitor");
}
}
Expand Down Expand Up @@ -225,27 +255,14 @@ private void retry(String subTaskSpecId, MonitorEntry monitorEntry, TaskStatusPl
}
}

/**
* This method should be called after {@link #stop()} to make sure no additional tasks are submitted.
*/
void killAll()
{
runningTasks.values().forEach(entry -> {
final String taskId = entry.runningTask.getId();
log.info("Request to kill subtask[%s]", taskId);
indexingServiceClient.killTask(taskId);
});
runningTasks.clear();
}

void incrementNumRunningTasks()
private void incrementNumRunningTasks()
{
synchronized (taskCountLock) {
numRunningTasks++;
}
}

void incrementNumSucceededTasks()
private void incrementNumSucceededTasks()
{
synchronized (taskCountLock) {
numRunningTasks--;
Expand All @@ -254,7 +271,7 @@ void incrementNumSucceededTasks()
}
}

void incrementNumFailedTasks()
private void incrementNumFailedTasks()
{
synchronized (taskCountLock) {
numRunningTasks--;
Expand All @@ -276,6 +293,12 @@ int getNumRunningTasks()
}
}

@VisibleForTesting
int getNumKilledTasks()
Copy link
Copy Markdown

@surekhasaharan surekhasaharan Feb 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May make sense to add the annotation @VisibleForTesting here instead

{
return numKilledTasks;
}

SinglePhaseParallelIndexingProgress getProgress()
{
synchronized (taskCountLock) {
Expand Down Expand Up @@ -336,7 +359,7 @@ class MonitorEntry
@Nullable
private volatile TaskStatusPlus runningStatus;

MonitorEntry(
private MonitorEntry(
SubTaskSpec<T> spec,
T runningTask,
@Nullable TaskStatusPlus runningStatus,
Expand Down
Loading