Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SegmentListUsedAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory;
import org.apache.druid.indexing.firehose.WindowedSegmentId;
import org.apache.druid.java.util.common.JodaUtils;
Expand All @@ -43,13 +46,15 @@
import org.joda.time.Period;

import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

Expand All @@ -64,12 +69,18 @@ public abstract class AbstractBatchIndexTask extends AbstractTask

private final SegmentLockHelper segmentLockHelper;

@GuardedBy("this")
private final TaskResourceCleaner resourceCloserOnAbnormalExit = new TaskResourceCleaner();

/**
* State to indicate that this task will use segmentLock or timeChunkLock.
* This is automatically set when {@link #determineLockGranularityandTryLock} is called.
*/
private boolean useSegmentLock;

@GuardedBy("this")
private boolean stopped = false;

protected AbstractBatchIndexTask(String id, String dataSource, Map<String, Object> context)
{
super(id, dataSource, context);
Expand All @@ -88,6 +99,60 @@ protected AbstractBatchIndexTask(
segmentLockHelper = new SegmentLockHelper();
}

/**
* Run this task. Before running the task, it checks the the current task is already stopped and
* registers a cleaner to interrupt the thread running this task on abnormal exits.
*
* @see #runTask(TaskToolbox)
* @see #stopGracefully(TaskConfig)
*/
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
synchronized (this) {
if (stopped) {
return TaskStatus.failure(getId());
} else {
// Register the cleaner to interrupt the current thread first.
// Since the resource closer cleans up the registered resources in LIFO order,
// this will be executed last on abnormal exists.
// The order is sometimes important. For example, Appenderator has two methods of close() and closeNow(), and
// closeNow() is supposed to be called on abnormal exits. Interrupting the current thread could lead to close()
// to be called indirectly, e.g., for Appenderators in try-with-resources. In this case, closeNow() should be
// called before the current thread is interrupted, so that subsequent close() calls can be ignored.
final Thread currentThread = Thread.currentThread();
resourceCloserOnAbnormalExit.register(config -> currentThread.interrupt());
}
}
return runTask(toolbox);
}

@Override
public void stopGracefully(TaskConfig taskConfig)
{
synchronized (this) {
stopped = true;
resourceCloserOnAbnormalExit.clean(taskConfig);
}
}

/**
* Registers a resource cleaner which is executed on abnormal exits.
*
* @see Task#stopGracefully
*/
protected void registerResourceCloserOnAbnormalExit(Consumer<TaskConfig> cleaner)
{
synchronized (this) {
resourceCloserOnAbnormalExit.register(cleaner);
}
}

/**
* The method to acutally process this task. This method is executed in {@link #run(TaskToolbox)}.
*/
public abstract TaskStatus runTask(TaskToolbox toolbox) throws Exception;

/**
* Return true if this task can overwrite existing segments.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.joda.time.Interval;

import java.util.Map;
Expand Down Expand Up @@ -80,4 +81,9 @@ public Interval getInterval()
{
return interval;
}

@Override
public void stopGracefully(TaskConfig taskConfig)
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
Expand Down Expand Up @@ -154,16 +153,6 @@ public boolean canRestore()
return false;
}

/**
* Should be called independent of canRestore so that resource cleaning can be achieved.
* If resource cleaning is required, concrete class should override this method
*/
@Override
public void stopGracefully(TaskConfig taskConfig)
{
// Do nothing and let the concrete class handle it
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ public TaskStatus run(final TaskToolbox toolbox)
}

CloseQuietly.close(firehose);
CloseQuietly.close(appenderator);
appenderator.close();
CloseQuietly.close(driver);

toolbox.getMonitorScheduler().removeMonitor(metricsMonitor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;
Expand All @@ -109,6 +110,12 @@ public class CompactionTask extends AbstractBatchIndexTask
private static final Logger log = new Logger(CompactionTask.class);
private static final String TYPE = "compact";

/**
* A flag to indicate this task is already stopped and its child indexTasks shouldn't be created.
* See {@link #currentRunningTaskSpec} for more details.
*/
private static final Object SPECIAL_VALUE_STOPPED = new Object();

private final Interval interval;
private final List<DataSegment> segments;
@Nullable
Expand Down Expand Up @@ -146,10 +153,22 @@ public class CompactionTask extends AbstractBatchIndexTask
private final RetryPolicyFactory retryPolicyFactory;

@JsonIgnore
private List<IndexTask> indexTaskSpecs;
private final AppenderatorsManager appenderatorsManager;

@JsonIgnore
private AppenderatorsManager appenderatorsManager;
private List<IndexTask> indexTaskSpecs;

/**
* Reference to the sub-task that is currently running.
*
* When {@link #stopGracefully} is called, the compaction task gets the reference to the current running task,
* and calls {@link #stopGracefully} for that task. This reference will be updated to {@link #SPECIAL_VALUE_STOPPED}.
*
* Note that {@link #stopGracefully} can be called at any time during {@link #run}. Calling {@link #stopGracefully}
* on the current running task and setting this reference to {@link #SPECIAL_VALUE_STOPPED} should be done atomically.
*/
@Nullable
private final AtomicReference<Object> currentRunningTaskSpec = new AtomicReference<>();

@JsonCreator
public CompactionTask(
Expand Down Expand Up @@ -289,7 +308,7 @@ public boolean isPerfectRollup()
}

@Override
public TaskStatus run(final TaskToolbox toolbox) throws Exception
public TaskStatus runTask(TaskToolbox toolbox) throws Exception
{
if (indexTaskSpecs == null) {
final List<IndexIngestionSpec> ingestionSpecs = createIngestionSchema(
Expand Down Expand Up @@ -330,12 +349,24 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception
log.info("Generated [%d] compaction task specs", totalNumSpecs);

int failCnt = 0;
registerResourceCloserOnAbnormalExit(config -> {
Object currentRunningTask = currentRunningTaskSpec.getAndSet(SPECIAL_VALUE_STOPPED);
if (currentRunningTask != null) {
((IndexTask) currentRunningTask).stopGracefully(config);
}
});
for (IndexTask eachSpec : indexTaskSpecs) {
Object prevSpec = currentRunningTaskSpec.get();
//noinspection ObjectEquality
if (prevSpec == SPECIAL_VALUE_STOPPED || !currentRunningTaskSpec.compareAndSet(prevSpec, eachSpec)) {
log.info("Task is asked to stop. Finish as failed.");
return TaskStatus.failure(getId());
}
final String json = jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(eachSpec);
log.info("Running indexSpec: " + json);

try {
if (eachSpec.isReady(toolbox.getTaskActionClient())) {
log.info("Running indexSpec: " + json);
final TaskStatus eachResult = eachSpec.run(toolbox);
if (!eachResult.isSuccess()) {
failCnt++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,6 @@ private static String getTheDataSource(HadoopIngestionSpec spec)
@JsonIgnore
private String errorMsg;

@JsonIgnore
private Thread runThread;

@JsonIgnore
private boolean stopped = false;


/**
* @param spec is used by the HadoopDruidIndexerJob to set up the appropriate parameters
* for creating Druid index segments. It may be modified.
Expand Down Expand Up @@ -264,22 +257,24 @@ public String getClasspathPrefix()
return classpathPrefix;
}

public String getHadoopJobIdFileName()
private String getHadoopJobIdFileName()
{
return new File(taskConfig.getTaskDir(getId()), HADOOP_JOB_ID_FILENAME).getAbsolutePath();
return getHadoopJobIdFile().getAbsolutePath();
}

@Override
public TaskStatus run(TaskToolbox toolbox)
private boolean hadoopJobIdFileExists()
{
synchronized (this) {
if (stopped) {
return TaskStatus.failure(getId());
} else {
runThread = Thread.currentThread();
}
}
return getHadoopJobIdFile().exists();
}

private File getHadoopJobIdFile()
{
return new File(taskConfig.getTaskDir(getId()), HADOOP_JOB_ID_FILENAME);
}

@Override
public TaskStatus runTask(TaskToolbox toolbox)
{
try {
taskConfig = toolbox.getConfig();
if (chatHandlerProvider.isPresent()) {
Expand Down Expand Up @@ -319,6 +314,7 @@ public TaskStatus run(TaskToolbox toolbox)
@SuppressWarnings("unchecked")
private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
{
registerResourceCloserOnAbnormalExit(config -> killHadoopJob());
String hadoopJobIdFile = getHadoopJobIdFileName();
final ClassLoader loader = buildClassLoader(toolbox);
boolean determineIntervals = !spec.getDataSchema().getGranularitySpec().bucketIntervals().isPresent();
Expand Down Expand Up @@ -475,33 +471,25 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
}
}

@Override
public void stopGracefully(TaskConfig taskConfig)
private void killHadoopJob()
{
synchronized (this) {
stopped = true;
if (runThread == null) {
// didn't actually start, just return
return;
}
}
// To avoid issue of kill command once the ingestion task is actually completed
if (!ingestionState.equals(IngestionState.COMPLETED)) {
if (hadoopJobIdFileExists() && !ingestionState.equals(IngestionState.COMPLETED)) {
final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader();
String hadoopJobIdFile = getHadoopJobIdFileName();

try {
ClassLoader loader = HadoopTask.buildClassLoader(getHadoopDependencyCoordinates(),
taskConfig.getDefaultHadoopCoordinates());
ClassLoader loader = HadoopTask.buildClassLoader(
getHadoopDependencyCoordinates(),
taskConfig.getDefaultHadoopCoordinates()
);

Object killMRJobInnerProcessingRunner = getForeignClassloaderObject(
"org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopKillMRJobIdProcessingRunner",
loader
);

String[] buildKillJobInput = new String[]{
hadoopJobIdFile
};
String[] buildKillJobInput = new String[]{hadoopJobIdFile};

Class<?> buildKillJobRunnerClass = killMRJobInnerProcessingRunner.getClass();
Method innerProcessingRunTask = buildKillJobRunnerClass.getMethod("runTask", buildKillJobInput.getClass());
Expand All @@ -519,7 +507,6 @@ public void stopGracefully(TaskConfig taskConfig)
}
finally {
Thread.currentThread().setContextClassLoader(oldLoader);
runThread.interrupt();
}
}
}
Expand Down
Loading