diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java index b00c646385a3..56dccbe83515 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java @@ -22,12 +22,15 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; import org.apache.druid.data.input.FirehoseFactory; +import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.TaskLockType; +import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.SegmentListUsedAction; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction; +import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory; import org.apache.druid.indexing.firehose.WindowedSegmentId; import org.apache.druid.java.util.common.JodaUtils; @@ -43,6 +46,7 @@ import org.joda.time.Period; import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -50,6 +54,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -64,12 +69,18 @@ public abstract class AbstractBatchIndexTask extends AbstractTask private final SegmentLockHelper segmentLockHelper; + @GuardedBy("this") + private final TaskResourceCleaner resourceCloserOnAbnormalExit = new TaskResourceCleaner(); + /** * State to indicate that this task will use segmentLock or timeChunkLock. * This is automatically set when {@link #determineLockGranularityandTryLock} is called. */ private boolean useSegmentLock; + @GuardedBy("this") + private boolean stopped = false; + protected AbstractBatchIndexTask(String id, String dataSource, Map context) { super(id, dataSource, context); @@ -88,6 +99,60 @@ protected AbstractBatchIndexTask( segmentLockHelper = new SegmentLockHelper(); } + /** + * Run this task. Before running the task, it checks the the current task is already stopped and + * registers a cleaner to interrupt the thread running this task on abnormal exits. + * + * @see #runTask(TaskToolbox) + * @see #stopGracefully(TaskConfig) + */ + @Override + public TaskStatus run(TaskToolbox toolbox) throws Exception + { + synchronized (this) { + if (stopped) { + return TaskStatus.failure(getId()); + } else { + // Register the cleaner to interrupt the current thread first. + // Since the resource closer cleans up the registered resources in LIFO order, + // this will be executed last on abnormal exists. + // The order is sometimes important. For example, Appenderator has two methods of close() and closeNow(), and + // closeNow() is supposed to be called on abnormal exits. Interrupting the current thread could lead to close() + // to be called indirectly, e.g., for Appenderators in try-with-resources. In this case, closeNow() should be + // called before the current thread is interrupted, so that subsequent close() calls can be ignored. + final Thread currentThread = Thread.currentThread(); + resourceCloserOnAbnormalExit.register(config -> currentThread.interrupt()); + } + } + return runTask(toolbox); + } + + @Override + public void stopGracefully(TaskConfig taskConfig) + { + synchronized (this) { + stopped = true; + resourceCloserOnAbnormalExit.clean(taskConfig); + } + } + + /** + * Registers a resource cleaner which is executed on abnormal exits. + * + * @see Task#stopGracefully + */ + protected void registerResourceCloserOnAbnormalExit(Consumer cleaner) + { + synchronized (this) { + resourceCloserOnAbnormalExit.register(cleaner); + } + } + + /** + * The method to acutally process this task. This method is executed in {@link #run(TaskToolbox)}. + */ + public abstract TaskStatus runTask(TaskToolbox toolbox) throws Exception; + /** * Return true if this task can overwrite existing segments. */ diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractFixedIntervalTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractFixedIntervalTask.java index 9582b987876c..3b117de942fe 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractFixedIntervalTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractFixedIntervalTask.java @@ -25,6 +25,7 @@ import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction; +import org.apache.druid.indexing.common.config.TaskConfig; import org.joda.time.Interval; import java.util.Map; @@ -80,4 +81,9 @@ public Interval getInterval() { return interval; } + + @Override + public void stopGracefully(TaskConfig taskConfig) + { + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java index 370b30ed1ec8..c901e685f7fe 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java @@ -28,7 +28,6 @@ import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.actions.LockListAction; import org.apache.druid.indexing.common.actions.TaskActionClient; -import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.query.Query; import org.apache.druid.query.QueryRunner; @@ -154,16 +153,6 @@ public boolean canRestore() return false; } - /** - * Should be called independent of canRestore so that resource cleaning can be achieved. - * If resource cleaning is required, concrete class should override this method - */ - @Override - public void stopGracefully(TaskConfig taskConfig) - { - // Do nothing and let the concrete class handle it - } - @Override public String toString() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index 3c22d2043c4f..564c965723f5 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -437,7 +437,7 @@ public TaskStatus run(final TaskToolbox toolbox) } CloseQuietly.close(firehose); - CloseQuietly.close(appenderator); + appenderator.close(); CloseQuietly.close(driver); toolbox.getMonitorScheduler().removeMonitor(metricsMonitor); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 1f16f01ca56a..b087c7c43e1c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -100,6 +100,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.StreamSupport; @@ -109,6 +110,12 @@ public class CompactionTask extends AbstractBatchIndexTask private static final Logger log = new Logger(CompactionTask.class); private static final String TYPE = "compact"; + /** + * A flag to indicate this task is already stopped and its child indexTasks shouldn't be created. + * See {@link #currentRunningTaskSpec} for more details. + */ + private static final Object SPECIAL_VALUE_STOPPED = new Object(); + private final Interval interval; private final List segments; @Nullable @@ -146,10 +153,22 @@ public class CompactionTask extends AbstractBatchIndexTask private final RetryPolicyFactory retryPolicyFactory; @JsonIgnore - private List indexTaskSpecs; + private final AppenderatorsManager appenderatorsManager; @JsonIgnore - private AppenderatorsManager appenderatorsManager; + private List indexTaskSpecs; + + /** + * Reference to the sub-task that is currently running. + * + * When {@link #stopGracefully} is called, the compaction task gets the reference to the current running task, + * and calls {@link #stopGracefully} for that task. This reference will be updated to {@link #SPECIAL_VALUE_STOPPED}. + * + * Note that {@link #stopGracefully} can be called at any time during {@link #run}. Calling {@link #stopGracefully} + * on the current running task and setting this reference to {@link #SPECIAL_VALUE_STOPPED} should be done atomically. + */ + @Nullable + private final AtomicReference currentRunningTaskSpec = new AtomicReference<>(); @JsonCreator public CompactionTask( @@ -289,7 +308,7 @@ public boolean isPerfectRollup() } @Override - public TaskStatus run(final TaskToolbox toolbox) throws Exception + public TaskStatus runTask(TaskToolbox toolbox) throws Exception { if (indexTaskSpecs == null) { final List ingestionSpecs = createIngestionSchema( @@ -330,12 +349,24 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception log.info("Generated [%d] compaction task specs", totalNumSpecs); int failCnt = 0; + registerResourceCloserOnAbnormalExit(config -> { + Object currentRunningTask = currentRunningTaskSpec.getAndSet(SPECIAL_VALUE_STOPPED); + if (currentRunningTask != null) { + ((IndexTask) currentRunningTask).stopGracefully(config); + } + }); for (IndexTask eachSpec : indexTaskSpecs) { + Object prevSpec = currentRunningTaskSpec.get(); + //noinspection ObjectEquality + if (prevSpec == SPECIAL_VALUE_STOPPED || !currentRunningTaskSpec.compareAndSet(prevSpec, eachSpec)) { + log.info("Task is asked to stop. Finish as failed."); + return TaskStatus.failure(getId()); + } final String json = jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(eachSpec); - log.info("Running indexSpec: " + json); try { if (eachSpec.isReady(toolbox.getTaskActionClient())) { + log.info("Running indexSpec: " + json); final TaskStatus eachResult = eachSpec.run(toolbox); if (!eachResult.isSuccess()) { failCnt++; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java index 7557242d2fc6..86ac882e87d7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java @@ -128,13 +128,6 @@ private static String getTheDataSource(HadoopIngestionSpec spec) @JsonIgnore private String errorMsg; - @JsonIgnore - private Thread runThread; - - @JsonIgnore - private boolean stopped = false; - - /** * @param spec is used by the HadoopDruidIndexerJob to set up the appropriate parameters * for creating Druid index segments. It may be modified. @@ -264,22 +257,24 @@ public String getClasspathPrefix() return classpathPrefix; } - public String getHadoopJobIdFileName() + private String getHadoopJobIdFileName() { - return new File(taskConfig.getTaskDir(getId()), HADOOP_JOB_ID_FILENAME).getAbsolutePath(); + return getHadoopJobIdFile().getAbsolutePath(); } - @Override - public TaskStatus run(TaskToolbox toolbox) + private boolean hadoopJobIdFileExists() { - synchronized (this) { - if (stopped) { - return TaskStatus.failure(getId()); - } else { - runThread = Thread.currentThread(); - } - } + return getHadoopJobIdFile().exists(); + } + private File getHadoopJobIdFile() + { + return new File(taskConfig.getTaskDir(getId()), HADOOP_JOB_ID_FILENAME); + } + + @Override + public TaskStatus runTask(TaskToolbox toolbox) + { try { taskConfig = toolbox.getConfig(); if (chatHandlerProvider.isPresent()) { @@ -319,6 +314,7 @@ public TaskStatus run(TaskToolbox toolbox) @SuppressWarnings("unchecked") private TaskStatus runInternal(TaskToolbox toolbox) throws Exception { + registerResourceCloserOnAbnormalExit(config -> killHadoopJob()); String hadoopJobIdFile = getHadoopJobIdFileName(); final ClassLoader loader = buildClassLoader(toolbox); boolean determineIntervals = !spec.getDataSchema().getGranularitySpec().bucketIntervals().isPresent(); @@ -475,33 +471,25 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception } } - @Override - public void stopGracefully(TaskConfig taskConfig) + private void killHadoopJob() { - synchronized (this) { - stopped = true; - if (runThread == null) { - // didn't actually start, just return - return; - } - } // To avoid issue of kill command once the ingestion task is actually completed - if (!ingestionState.equals(IngestionState.COMPLETED)) { + if (hadoopJobIdFileExists() && !ingestionState.equals(IngestionState.COMPLETED)) { final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader(); String hadoopJobIdFile = getHadoopJobIdFileName(); try { - ClassLoader loader = HadoopTask.buildClassLoader(getHadoopDependencyCoordinates(), - taskConfig.getDefaultHadoopCoordinates()); + ClassLoader loader = HadoopTask.buildClassLoader( + getHadoopDependencyCoordinates(), + taskConfig.getDefaultHadoopCoordinates() + ); Object killMRJobInnerProcessingRunner = getForeignClassloaderObject( "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopKillMRJobIdProcessingRunner", loader ); - String[] buildKillJobInput = new String[]{ - hadoopJobIdFile - }; + String[] buildKillJobInput = new String[]{hadoopJobIdFile}; Class buildKillJobRunnerClass = killMRJobInnerProcessingRunner.getClass(); Method innerProcessingRunTask = buildKillJobRunnerClass.getMethod("runTask", buildKillJobInput.getClass()); @@ -519,7 +507,6 @@ public void stopGracefully(TaskConfig taskConfig) } finally { Thread.currentThread().setContextClassLoader(oldLoader); - runThread.interrupt(); } } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 16254de351ad..c314cef7f844 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -51,7 +51,6 @@ import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction; import org.apache.druid.indexing.common.actions.TaskActionClient; -import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.stats.RowIngestionMeters; import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor; @@ -178,15 +177,6 @@ private static String makeGroupId(boolean isAppendToExisting, String dataSource) @JsonIgnore private final AppenderatorsManager appenderatorsManager; - @JsonIgnore - private Thread runThread; - - @JsonIgnore - private boolean stopped = false; - - @JsonIgnore - private Appenderator appenderator; - @JsonCreator public IndexTask( @JsonProperty("id") final String id, @@ -421,16 +411,8 @@ public IndexIngestionSpec getIngestionSchema() } @Override - public TaskStatus run(final TaskToolbox toolbox) + public TaskStatus runTask(final TaskToolbox toolbox) { - synchronized (this) { - if (stopped) { - return TaskStatus.failure(getId()); - } else { - runThread = Thread.currentThread(); - } - } - try { if (chatHandlerProvider.isPresent()) { log.info("Found chat handler of class[%s]", chatHandlerProvider.get().getClass().getName()); @@ -513,23 +495,6 @@ public TaskStatus run(final TaskToolbox toolbox) } } - @Override - public void stopGracefully(TaskConfig taskConfig) - { - synchronized (this) { - stopped = true; - // Nothing else to do for native batch except terminate - if (ingestionState != IngestionState.COMPLETED) { - if (appenderator != null) { - appenderator.closeNow(); - } - if (runThread != null) { - runThread.interrupt(); - } - } - } - } - private Map getTaskCompletionReports() { return TaskReport.buildTaskReports( @@ -925,17 +890,17 @@ private TaskStatus generateAndPublishSegments( toolbox.getTaskActionClient() .submit(SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten, segmentsToPublish)); + final Appenderator appenderator = newAppenderator( + buildSegmentsFireDepartmentMetrics, + toolbox, + dataSchema, + tuningConfig + ); + boolean exceptionOccurred = false; try ( - final Appenderator appenderator = newAppenderator( - buildSegmentsFireDepartmentMetrics, - toolbox, - dataSchema, - tuningConfig - ); final BatchAppenderatorDriver driver = newDriver(appenderator, toolbox, segmentAllocator); final Firehose firehose = firehoseFactory.connect(dataSchema.getParser(), firehoseTempDir) ) { - this.appenderator = appenderator; driver.startJob(); @@ -1007,6 +972,7 @@ private TaskStatus generateAndPublishSegments( : null; // Probably we can publish atomicUpdateGroup along with segments. final SegmentsAndMetadata published = awaitPublish(driver.publishAll(inputSegments, publisher), pushTimeout); + appenderator.close(); ingestionState = IngestionState.COMPLETED; if (published == null) { @@ -1031,8 +997,20 @@ private TaskStatus generateAndPublishSegments( } } catch (TimeoutException | ExecutionException e) { + exceptionOccurred = true; throw new RuntimeException(e); } + catch (Exception e) { + exceptionOccurred = true; + throw e; + } + finally { + if (exceptionOccurred) { + appenderator.closeNow(); + } else { + appenderator.close(); + } + } } private void handleParseException(ParseException e) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java index 1bbe4eeba303..8fcf252f39e9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java @@ -28,6 +28,7 @@ import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; @@ -138,6 +139,11 @@ public boolean isReady(TaskActionClient taskActionClient) } } + @Override + public void stopGracefully(TaskConfig taskConfig) + { + } + @Override public TaskStatus run(TaskToolbox toolbox) throws Exception { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java index 45aabd8d33b3..097bf7474ba8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java @@ -161,8 +161,22 @@ default int getPriority() boolean canRestore(); /** - * Asks a task to arrange for its "run" method to exit promptly. Tasks that take too long to stop gracefully will be terminated with - * extreme prejudice. + * Asks a task to arrange for its "run" method to exit promptly. Tasks that take too long to stop gracefully will be + * terminated with extreme prejudice. + * + * This method can be called at any time no matter when {@link #run} is executed. Regardless of when this method is + * called with respect to {@link #run}, its implementations must not allow a resource leak or lingering executions + * (local or remote). + * + * Depending on the task executor type, one of the two cases below can happen when the task is killed. + * - When the task is executed by a middleManager, {@link org.apache.druid.indexing.overlord.ForkingTaskRunner} kills + * the process running the task, which triggers + * {@link org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner#stop}. + * - When the task is executed by an indexer, {@link org.apache.druid.indexing.overlord.ThreadingTaskRunner#shutdown} + * calls this method directly. + * + * If the task has some resources to clean up on abnormal exit, e.g., sub tasks of parallel indexing task + * or Hadoop jobs spawned by Hadoop indexing tasks, those resource cleanups should be done in this method. * * @param taskConfig TaskConfig for this task */ diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskResourceCleaner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskResourceCleaner.java new file mode 100644 index 000000000000..a96cd7009c58 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskResourceCleaner.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task; + +import org.apache.druid.indexing.common.config.TaskConfig; + +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.function.Consumer; + +/** + * Executes all registered {@link Consumer}s in LIFO order. + * Similar to {@link org.apache.druid.java.util.common.io.Closer}, but this class is tweaked to be used in + * {@link Task#stopGracefully(TaskConfig)}. + */ +public class TaskResourceCleaner +{ + private final Deque> stack = new ArrayDeque<>(4); + + public void register(Consumer cleaner) + { + stack.addFirst(cleaner); + } + + public void clean(TaskConfig config) + { + Throwable throwable = null; + + // Clean up in LIFO order + while (!stack.isEmpty()) { + final Consumer cleaner = stack.removeFirst(); + try { + cleaner.accept(config); + } + catch (Throwable t) { + if (throwable == null) { + throwable = t; + } else { + suppress(throwable, t); + } + } + } + + if (throwable != null) { + throw new RuntimeException(throwable); + } + } + + private void suppress(Throwable thrown, Throwable suppressed) + { + //noinspection ObjectEquality + if (thrown != suppressed) { + thrown.addSuppressed(suppressed); + } + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java index fb902cbe067d..3e3cf80bc180 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java @@ -38,7 +38,6 @@ import org.apache.druid.indexing.common.actions.SegmentAllocateAction; import org.apache.druid.indexing.common.actions.SurrogateAction; import org.apache.druid.indexing.common.actions.TaskActionClient; -import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.task.AbstractBatchIndexTask; import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider; import org.apache.druid.indexing.common.task.IndexTask; @@ -107,10 +106,6 @@ public class ParallelIndexSubTask extends AbstractBatchIndexTask private final IndexTaskClientFactory taskClientFactory; private final AppenderatorsManager appenderatorsManager; - private Appenderator appenderator; - private Thread runThread; - private boolean stopped = false; - /** * If intervals are missing in the granularitySpec, parallel index task runs in "dynamic locking mode". * In this mode, sub tasks ask new locks whenever they see a new row which is not covered by existing locks. @@ -205,16 +200,8 @@ public String getSupervisorTaskId() } @Override - public TaskStatus run(final TaskToolbox toolbox) throws Exception + public TaskStatus runTask(final TaskToolbox toolbox) throws Exception { - synchronized (this) { - if (stopped) { - return TaskStatus.failure(getId()); - } else { - runThread = Thread.currentThread(); - } - } - if (missingIntervalsInOverwriteMode) { LOG.warn( "Intervals are missing in granularitySpec while this task is potentially overwriting existing segments. " @@ -425,12 +412,12 @@ private Set generateAndPushSegments( final boolean explicitIntervals = granularitySpec.bucketIntervals().isPresent(); final SegmentAllocator segmentAllocator = createSegmentAllocator(toolbox, taskClient); + final Appenderator appenderator = newAppenderator(fireDepartmentMetrics, toolbox, dataSchema, tuningConfig); + boolean exceptionOccurred = false; try ( - final Appenderator appenderator = newAppenderator(fireDepartmentMetrics, toolbox, dataSchema, tuningConfig); final BatchAppenderatorDriver driver = newDriver(appenderator, toolbox, segmentAllocator); final Firehose firehose = firehoseFactory.connect(dataSchema.getParser(), firehoseTempDir) ) { - this.appenderator = appenderator; driver.startJob(); final Set pushedSegments = new HashSet<>(); @@ -496,12 +483,25 @@ private Set generateAndPushSegments( final SegmentsAndMetadata pushed = driver.pushAllAndClear(pushTimeout); pushedSegments.addAll(pushed.getSegments()); LOG.info("Pushed segments[%s]", pushed.getSegments()); + appenderator.close(); return pushedSegments; } catch (TimeoutException | ExecutionException e) { + exceptionOccurred = true; throw new RuntimeException(e); } + catch (Exception e) { + exceptionOccurred = true; + throw e; + } + finally { + if (exceptionOccurred) { + appenderator.closeNow(); + } else { + appenderator.close(); + } + } } private Appenderator newAppenderator( @@ -536,18 +536,4 @@ private static BatchAppenderatorDriver newDriver( toolbox.getDataSegmentKiller() ); } - - @Override - public void stopGracefully(TaskConfig taskConfig) - { - synchronized (this) { - stopped = true; - if (appenderator != null) { - appenderator.closeNow(); - } - if (runThread != null) { - runThread.interrupt(); - } - } - } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index c8cd3d9ddaff..e6ef2c485c9f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -38,7 +38,6 @@ import org.apache.druid.indexing.common.actions.LockListAction; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction; -import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; import org.apache.druid.indexing.common.task.AbstractBatchIndexTask; import org.apache.druid.indexing.common.task.IndexTask; @@ -128,8 +127,6 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen private volatile ParallelIndexTaskRunner runner; private volatile IndexTask sequentialIndexTask; - private boolean stopped = false; - // toolbox is initlized when run() is called, and can be used for processing HTTP endpoint requests. private volatile TaskToolbox toolbox; @@ -287,20 +284,7 @@ public Granularity getSegmentGranularity() } @Override - public void stopGracefully(TaskConfig taskConfig) - { - synchronized (this) { - stopped = true; - } - if (runner != null) { - runner.stopGracefully(); - } else if (sequentialIndexTask != null) { - sequentialIndexTask.stopGracefully(taskConfig); - } - } - - @Override - public TaskStatus run(TaskToolbox toolbox) throws Exception + public TaskStatus runTask(TaskToolbox toolbox) throws Exception { if (missingIntervalsInOverwriteMode) { LOG.warn( @@ -357,39 +341,31 @@ void setToolbox(TaskToolbox toolbox) private TaskStatus runParallel(TaskToolbox toolbox) throws Exception { - synchronized (this) { - if (stopped) { - return TaskStatus.failure(getId()); - } - createRunner(toolbox); - } + createRunner(toolbox); + registerResourceCloserOnAbnormalExit(config -> runner.stopGracefully()); return TaskStatus.fromCode(getId(), Preconditions.checkNotNull(runner, "runner").run()); } private TaskStatus runSequential(TaskToolbox toolbox) throws Exception { - synchronized (this) { - if (stopped) { - return TaskStatus.failure(getId()); - } - sequentialIndexTask = new IndexTask( - getId(), - getGroupId(), - getTaskResource(), - getDataSource(), - new IndexIngestionSpec( - getIngestionSchema().getDataSchema(), - getIngestionSchema().getIOConfig(), - convertToIndexTuningConfig(getIngestionSchema().getTuningConfig()) - ), - getContext(), - authorizerMapper, - chatHandlerProvider, - rowIngestionMetersFactory, - appenderatorsManager - ); - } + sequentialIndexTask = new IndexTask( + getId(), + getGroupId(), + getTaskResource(), + getDataSource(), + new IndexIngestionSpec( + getIngestionSchema().getDataSchema(), + getIngestionSchema().getIOConfig(), + convertToIndexTuningConfig(getIngestionSchema().getTuningConfig()) + ), + getContext(), + authorizerMapper, + chatHandlerProvider, + rowIngestionMetersFactory, + appenderatorsManager + ); if (sequentialIndexTask.isReady(toolbox.getTaskActionClient())) { + registerResourceCloserOnAbnormalExit(config -> sequentialIndexTask.stopGracefully(config)); return sequentialIndexTask.run(toolbox); } else { return TaskStatus.failure(getId()); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestTasks.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestTasks.java index 7879d8753569..d501b980d8cb 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestTasks.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestTasks.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.task.AbstractTask; import org.apache.druid.indexing.common.task.Task; @@ -68,6 +69,11 @@ public boolean isReady(TaskActionClient taskActionClient) return true; } + @Override + public void stopGracefully(TaskConfig taskConfig) + { + } + @Override public TaskStatus run(TaskToolbox toolbox) { @@ -96,6 +102,11 @@ public boolean isReady(TaskActionClient taskActionClient) return true; } + @Override + public void stopGracefully(TaskConfig taskConfig) + { + } + @Override public TaskStatus run(TaskToolbox toolbox) throws Exception { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java index 42c864163587..2ec6b1200e88 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java @@ -69,6 +69,11 @@ public boolean isReady(TaskActionClient taskActionClient) return false; } + @Override + public void stopGracefully(TaskConfig taskConfig) + { + } + @Override public boolean requireLockExistingSegments() { @@ -95,7 +100,7 @@ public Granularity getSegmentGranularity() } @Override - public TaskStatus run(TaskToolbox toolbox) + public TaskStatus runTask(TaskToolbox toolbox) { return null; } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java index 7d90a062a4af..8ddf6cc299fb 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java @@ -40,11 +40,14 @@ import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.hamcrest.CoreMatchers; import org.joda.time.Interval; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import javax.annotation.Nullable; import java.io.IOException; @@ -52,12 +55,16 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.stream.Stream; public class ParallelIndexSupervisorTaskKillTest extends AbstractParallelIndexSupervisorTaskTest { + @Rule + public ExpectedException expectedException = ExpectedException.none(); + private ExecutorService service; @Before @@ -98,7 +105,9 @@ public void testStopGracefully() throws Exception Thread.sleep(100); } task.stopGracefully(null); - Assert.assertEquals(TaskState.FAILED, future.get()); + expectedException.expect(ExecutionException.class); + expectedException.expectCause(CoreMatchers.instanceOf(InterruptedException.class)); + future.get(); final TestParallelIndexTaskRunner runner = (TestParallelIndexTaskRunner) task.getRunner(); Assert.assertTrue(runner.getRunningTaskIds().isEmpty()); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RealtimeishTask.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RealtimeishTask.java index 8724ba9c125a..40141412e8ab 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RealtimeishTask.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RealtimeishTask.java @@ -30,6 +30,7 @@ import org.apache.druid.indexing.common.actions.SegmentInsertAction; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.actions.TimeChunkLockAcquireAction; +import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.task.AbstractTask; import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.java.util.common.Intervals; @@ -60,6 +61,11 @@ public boolean isReady(TaskActionClient taskActionClient) return true; } + @Override + public void stopGracefully(TaskConfig taskConfig) + { + } + @Override public TaskStatus run(TaskToolbox toolbox) throws Exception { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index f7dbfdee460a..475b8ef6f68f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -947,6 +947,11 @@ public String getType() return "test"; } + @Override + public void stopGracefully(TaskConfig taskConfig) + { + } + @Override public TaskStatus run(TaskToolbox toolbox) throws Exception { @@ -990,6 +995,11 @@ public String getType() return "test"; } + @Override + public void stopGracefully(TaskConfig taskConfig) + { + } + @Override public TaskStatus run(TaskToolbox toolbox) throws Exception { @@ -1024,6 +1034,11 @@ public String getType() return "test"; } + @Override + public void stopGracefully(TaskConfig taskConfig) + { + } + @Override public TaskStatus run(TaskToolbox toolbox) throws Exception { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java index 004c15b3527e..db0fe75e155e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java @@ -34,6 +34,7 @@ import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TimeChunkLock; import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.config.TaskStorageConfig; import org.apache.druid.indexing.common.task.AbstractTask; import org.apache.druid.indexing.common.task.NoopTask; @@ -1232,6 +1233,11 @@ public boolean isReady(TaskActionClient taskActionClient) return true; } + @Override + public void stopGracefully(TaskConfig taskConfig) + { + } + @Override public TaskStatus run(TaskToolbox toolbox) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java index 3a58c0cf6278..159192562798 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java @@ -31,6 +31,7 @@ import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.task.AbstractTask; import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; @@ -1279,6 +1280,11 @@ public boolean isReady(TaskActionClient taskActionClient) return false; } + @Override + public void stopGracefully(TaskConfig taskConfig) + { + } + @Override public TaskStatus run(TaskToolbox toolbox) { diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java index 7ad67ecc8afe..ce5ac8f89d81 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java @@ -29,7 +29,6 @@ import org.apache.druid.segment.incremental.IndexSizeExceededException; import javax.annotation.Nullable; -import java.io.Closeable; import java.util.Collection; import java.util.List; @@ -46,7 +45,7 @@ * all methods of the data appending and indexing lifecycle except {@link #drop} must be called from a single thread. * Methods inherited from {@link QuerySegmentWalker} can be called concurrently from multiple threads. */ -public interface Appenderator extends QuerySegmentWalker, Closeable +public interface Appenderator extends QuerySegmentWalker { /** * Return the name of the dataSource associated with this Appenderator. @@ -200,7 +199,6 @@ ListenableFuture push( * pushes to finish. This will not remove any on-disk persisted data, but it will drop any data that has not yet been * persisted. */ - @Override void close(); /** diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java index b92bd1210422..fed1d705e9c9 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java @@ -154,11 +154,12 @@ public int columnCacheSizeBytes() null ); - try (Appenderator appenderator = defaultOfflineAppenderatorFactory.build( + Appenderator appenderator = defaultOfflineAppenderatorFactory.build( schema, tuningConfig, new FireDepartmentMetrics() - )) { + ); + try { Assert.assertEquals("dataSourceName", appenderator.getDataSource()); Assert.assertEquals(null, appenderator.startJob()); SegmentIdWithShardSpec identifier = new SegmentIdWithShardSpec( @@ -175,5 +176,8 @@ public int columnCacheSizeBytes() appenderator.close(); Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory()); } + finally { + appenderator.close(); + } } }