From 5a98275a5e49b7c3eb8c3c5af5a528da732b400a Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Sat, 3 Aug 2019 13:04:21 -0700 Subject: [PATCH 01/17] Add TaskResourceCleaner; fix a couple of concurrency bugs in batch tasks --- .../common/task/AbstractBatchIndexTask.java | 57 +++++++++++++++ .../task/AbstractFixedIntervalTask.java | 6 ++ .../indexing/common/task/AbstractTask.java | 11 --- .../indexing/common/task/CompactionTask.java | 15 +++- .../indexing/common/task/HadoopIndexTask.java | 51 +++++-------- .../druid/indexing/common/task/IndexTask.java | 39 +--------- .../druid/indexing/common/task/NoopTask.java | 6 ++ .../druid/indexing/common/task/Task.java | 7 +- .../common/task/TaskResourceCleaner.java | 73 +++++++++++++++++++ .../batch/parallel/ParallelIndexSubTask.java | 31 +------- .../parallel/ParallelIndexSupervisorTask.java | 64 +++++----------- .../druid/indexing/common/TestTasks.java | 11 +++ .../indexing/common/task/HadoopTaskTest.java | 7 +- .../indexing/overlord/RealtimeishTask.java | 6 ++ .../indexing/overlord/TaskLifecycleTest.java | 15 ++++ .../indexing/overlord/TaskLockboxTest.java | 6 ++ .../overlord/http/OverlordResourceTest.java | 6 ++ 17 files changed, 251 insertions(+), 160 deletions(-) create mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskResourceCleaner.java diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java index b00c646385a3..f1cd3e76d220 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java @@ -22,12 +22,15 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; import org.apache.druid.data.input.FirehoseFactory; +import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.TaskLockType; +import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.SegmentListUsedAction; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction; +import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory; import org.apache.druid.indexing.firehose.WindowedSegmentId; import org.apache.druid.java.util.common.JodaUtils; @@ -43,6 +46,7 @@ import org.joda.time.Period; import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -50,6 +54,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -64,12 +69,18 @@ public abstract class AbstractBatchIndexTask extends AbstractTask private final SegmentLockHelper segmentLockHelper; + @GuardedBy("this") + private final TaskResourceCleaner resourceCloserOnAbnormalExit = new TaskResourceCleaner(); + /** * State to indicate that this task will use segmentLock or timeChunkLock. * This is automatically set when {@link #determineLockGranularityandTryLock} is called. */ private boolean useSegmentLock; + @GuardedBy("this") + private boolean stopped = false; + protected AbstractBatchIndexTask(String id, String dataSource, Map context) { super(id, dataSource, context); @@ -88,6 +99,52 @@ protected AbstractBatchIndexTask( segmentLockHelper = new SegmentLockHelper(); } + /** + * Run this task. Before running the task, ithecks the the current task is already stopped and + * registers a cleaner to interrupt the thread running this task on abnormal exits. + * + * @see #runTask(TaskToolbox) + * @see #stopGracefully(TaskConfig) + */ + @Override + public TaskStatus run(TaskToolbox toolbox) throws Exception + { + synchronized (this) { + if (stopped) { + return TaskStatus.failure(getId()); + } else { + resourceCloserOnAbnormalExit.register(config -> Thread.currentThread().interrupt()); + } + } + return runTask(toolbox); + } + + @Override + public void stopGracefully(TaskConfig taskConfig) + { + synchronized (this) { + stopped = true; + resourceCloserOnAbnormalExit.clean(taskConfig); + } + } + + /** + * Registers a resource cleaner which is executed on abnormal exits. + * + * @see Task#stopGracefully + */ + protected void registerResourceCloserOnAbnormalExit(Consumer cleaner) + { + synchronized (this) { + resourceCloserOnAbnormalExit.register(cleaner); + } + } + + /** + * The method to acutally process this task. This method is executed in {@link #run(TaskToolbox)}. + */ + public abstract TaskStatus runTask(TaskToolbox toolbox) throws Exception; + /** * Return true if this task can overwrite existing segments. */ diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractFixedIntervalTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractFixedIntervalTask.java index 9582b987876c..3b117de942fe 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractFixedIntervalTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractFixedIntervalTask.java @@ -25,6 +25,7 @@ import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction; +import org.apache.druid.indexing.common.config.TaskConfig; import org.joda.time.Interval; import java.util.Map; @@ -80,4 +81,9 @@ public Interval getInterval() { return interval; } + + @Override + public void stopGracefully(TaskConfig taskConfig) + { + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java index 370b30ed1ec8..c901e685f7fe 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java @@ -28,7 +28,6 @@ import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.actions.LockListAction; import org.apache.druid.indexing.common.actions.TaskActionClient; -import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.query.Query; import org.apache.druid.query.QueryRunner; @@ -154,16 +153,6 @@ public boolean canRestore() return false; } - /** - * Should be called independent of canRestore so that resource cleaning can be achieved. - * If resource cleaning is required, concrete class should override this method - */ - @Override - public void stopGracefully(TaskConfig taskConfig) - { - // Do nothing and let the concrete class handle it - } - @Override public String toString() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 1f16f01ca56a..e40b1eb009d9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -146,10 +146,13 @@ public class CompactionTask extends AbstractBatchIndexTask private final RetryPolicyFactory retryPolicyFactory; @JsonIgnore - private List indexTaskSpecs; + private final AppenderatorsManager appenderatorsManager; @JsonIgnore - private AppenderatorsManager appenderatorsManager; + private List indexTaskSpecs; + + @Nullable + private volatile IndexTask currentRunningTaskSpec = null; @JsonCreator public CompactionTask( @@ -289,7 +292,7 @@ public boolean isPerfectRollup() } @Override - public TaskStatus run(final TaskToolbox toolbox) throws Exception + public TaskStatus runTask(TaskToolbox toolbox) throws Exception { if (indexTaskSpecs == null) { final List ingestionSpecs = createIngestionSchema( @@ -330,12 +333,18 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception log.info("Generated [%d] compaction task specs", totalNumSpecs); int failCnt = 0; + registerResourceCloserOnAbnormalExit(config -> { + if (currentRunningTaskSpec != null) { + currentRunningTaskSpec.stopGracefully(config); + } + }); for (IndexTask eachSpec : indexTaskSpecs) { final String json = jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(eachSpec); log.info("Running indexSpec: " + json); try { if (eachSpec.isReady(toolbox.getTaskActionClient())) { + currentRunningTaskSpec = eachSpec; final TaskStatus eachResult = eachSpec.run(toolbox); if (!eachResult.isSuccess()) { failCnt++; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java index 7557242d2fc6..8b7f653d5958 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java @@ -128,13 +128,6 @@ private static String getTheDataSource(HadoopIngestionSpec spec) @JsonIgnore private String errorMsg; - @JsonIgnore - private Thread runThread; - - @JsonIgnore - private boolean stopped = false; - - /** * @param spec is used by the HadoopDruidIndexerJob to set up the appropriate parameters * for creating Druid index segments. It may be modified. @@ -264,22 +257,24 @@ public String getClasspathPrefix() return classpathPrefix; } - public String getHadoopJobIdFileName() + private String getHadoopJobIdFileName() { - return new File(taskConfig.getTaskDir(getId()), HADOOP_JOB_ID_FILENAME).getAbsolutePath(); + return getHadoopJobIdFile().getAbsolutePath(); } - @Override - public TaskStatus run(TaskToolbox toolbox) + private boolean hadoopJobIdFileExists() { - synchronized (this) { - if (stopped) { - return TaskStatus.failure(getId()); - } else { - runThread = Thread.currentThread(); - } - } + return getHadoopJobIdFile().exists(); + } + private File getHadoopJobIdFile() + { + return new File(taskConfig.getTaskDir(getId()), HADOOP_JOB_ID_FILENAME); + } + + @Override + public TaskStatus runTask(TaskToolbox toolbox) + { try { taskConfig = toolbox.getConfig(); if (chatHandlerProvider.isPresent()) { @@ -319,6 +314,7 @@ public TaskStatus run(TaskToolbox toolbox) @SuppressWarnings("unchecked") private TaskStatus runInternal(TaskToolbox toolbox) throws Exception { + registerResourceCloserOnAbnormalExit(config -> killHadoopJob()); String hadoopJobIdFile = getHadoopJobIdFileName(); final ClassLoader loader = buildClassLoader(toolbox); boolean determineIntervals = !spec.getDataSchema().getGranularitySpec().bucketIntervals().isPresent(); @@ -475,33 +471,23 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception } } - @Override - public void stopGracefully(TaskConfig taskConfig) + private void killHadoopJob() { - synchronized (this) { - stopped = true; - if (runThread == null) { - // didn't actually start, just return - return; - } - } // To avoid issue of kill command once the ingestion task is actually completed - if (!ingestionState.equals(IngestionState.COMPLETED)) { + if (hadoopJobIdFileExists() && !ingestionState.equals(IngestionState.COMPLETED)) { final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader(); String hadoopJobIdFile = getHadoopJobIdFileName(); try { ClassLoader loader = HadoopTask.buildClassLoader(getHadoopDependencyCoordinates(), - taskConfig.getDefaultHadoopCoordinates()); + taskConfig.getDefaultHadoopCoordinates()); Object killMRJobInnerProcessingRunner = getForeignClassloaderObject( "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopKillMRJobIdProcessingRunner", loader ); - String[] buildKillJobInput = new String[]{ - hadoopJobIdFile - }; + String[] buildKillJobInput = new String[]{hadoopJobIdFile}; Class buildKillJobRunnerClass = killMRJobInnerProcessingRunner.getClass(); Method innerProcessingRunTask = buildKillJobRunnerClass.getMethod("runTask", buildKillJobInput.getClass()); @@ -519,7 +505,6 @@ public void stopGracefully(TaskConfig taskConfig) } finally { Thread.currentThread().setContextClassLoader(oldLoader); - runThread.interrupt(); } } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 16254de351ad..25689263f459 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -51,7 +51,6 @@ import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction; import org.apache.druid.indexing.common.actions.TaskActionClient; -import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.stats.RowIngestionMeters; import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor; @@ -178,15 +177,6 @@ private static String makeGroupId(boolean isAppendToExisting, String dataSource) @JsonIgnore private final AppenderatorsManager appenderatorsManager; - @JsonIgnore - private Thread runThread; - - @JsonIgnore - private boolean stopped = false; - - @JsonIgnore - private Appenderator appenderator; - @JsonCreator public IndexTask( @JsonProperty("id") final String id, @@ -421,16 +411,8 @@ public IndexIngestionSpec getIngestionSchema() } @Override - public TaskStatus run(final TaskToolbox toolbox) + public TaskStatus runTask(final TaskToolbox toolbox) { - synchronized (this) { - if (stopped) { - return TaskStatus.failure(getId()); - } else { - runThread = Thread.currentThread(); - } - } - try { if (chatHandlerProvider.isPresent()) { log.info("Found chat handler of class[%s]", chatHandlerProvider.get().getClass().getName()); @@ -513,23 +495,6 @@ public TaskStatus run(final TaskToolbox toolbox) } } - @Override - public void stopGracefully(TaskConfig taskConfig) - { - synchronized (this) { - stopped = true; - // Nothing else to do for native batch except terminate - if (ingestionState != IngestionState.COMPLETED) { - if (appenderator != null) { - appenderator.closeNow(); - } - if (runThread != null) { - runThread.interrupt(); - } - } - } - } - private Map getTaskCompletionReports() { return TaskReport.buildTaskReports( @@ -935,7 +900,7 @@ private TaskStatus generateAndPublishSegments( final BatchAppenderatorDriver driver = newDriver(appenderator, toolbox, segmentAllocator); final Firehose firehose = firehoseFactory.connect(dataSchema.getParser(), firehoseTempDir) ) { - this.appenderator = appenderator; + registerResourceCloserOnAbnormalExit(config -> appenderator.closeNow()); driver.startJob(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java index 1bbe4eeba303..8fcf252f39e9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java @@ -28,6 +28,7 @@ import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; @@ -138,6 +139,11 @@ public boolean isReady(TaskActionClient taskActionClient) } } + @Override + public void stopGracefully(TaskConfig taskConfig) + { + } + @Override public TaskStatus run(TaskToolbox toolbox) throws Exception { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java index 45aabd8d33b3..eac71e4e5f09 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java @@ -161,8 +161,11 @@ default int getPriority() boolean canRestore(); /** - * Asks a task to arrange for its "run" method to exit promptly. Tasks that take too long to stop gracefully will be terminated with - * extreme prejudice. + * Asks a task to arrange for its "run" method to exit promptly. Tasks that take too long to stop gracefully will be + * terminated with extreme prejudice. + * + * If the task has some resources to clean up on exit, e.g., sub tasks of parallel indexing task + * or Hadoop job of Hadoop indexing task, those resource cleanup should be done in this method. * * @param taskConfig TaskConfig for this task */ diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskResourceCleaner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskResourceCleaner.java new file mode 100644 index 000000000000..a96cd7009c58 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskResourceCleaner.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task; + +import org.apache.druid.indexing.common.config.TaskConfig; + +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.function.Consumer; + +/** + * Executes all registered {@link Consumer}s in LIFO order. + * Similar to {@link org.apache.druid.java.util.common.io.Closer}, but this class is tweaked to be used in + * {@link Task#stopGracefully(TaskConfig)}. + */ +public class TaskResourceCleaner +{ + private final Deque> stack = new ArrayDeque<>(4); + + public void register(Consumer cleaner) + { + stack.addFirst(cleaner); + } + + public void clean(TaskConfig config) + { + Throwable throwable = null; + + // Clean up in LIFO order + while (!stack.isEmpty()) { + final Consumer cleaner = stack.removeFirst(); + try { + cleaner.accept(config); + } + catch (Throwable t) { + if (throwable == null) { + throwable = t; + } else { + suppress(throwable, t); + } + } + } + + if (throwable != null) { + throw new RuntimeException(throwable); + } + } + + private void suppress(Throwable thrown, Throwable suppressed) + { + //noinspection ObjectEquality + if (thrown != suppressed) { + thrown.addSuppressed(suppressed); + } + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java index fb902cbe067d..69b92edfa184 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java @@ -38,7 +38,6 @@ import org.apache.druid.indexing.common.actions.SegmentAllocateAction; import org.apache.druid.indexing.common.actions.SurrogateAction; import org.apache.druid.indexing.common.actions.TaskActionClient; -import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.task.AbstractBatchIndexTask; import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider; import org.apache.druid.indexing.common.task.IndexTask; @@ -107,10 +106,6 @@ public class ParallelIndexSubTask extends AbstractBatchIndexTask private final IndexTaskClientFactory taskClientFactory; private final AppenderatorsManager appenderatorsManager; - private Appenderator appenderator; - private Thread runThread; - private boolean stopped = false; - /** * If intervals are missing in the granularitySpec, parallel index task runs in "dynamic locking mode". * In this mode, sub tasks ask new locks whenever they see a new row which is not covered by existing locks. @@ -205,16 +200,8 @@ public String getSupervisorTaskId() } @Override - public TaskStatus run(final TaskToolbox toolbox) throws Exception + public TaskStatus runTask(final TaskToolbox toolbox) throws Exception { - synchronized (this) { - if (stopped) { - return TaskStatus.failure(getId()); - } else { - runThread = Thread.currentThread(); - } - } - if (missingIntervalsInOverwriteMode) { LOG.warn( "Intervals are missing in granularitySpec while this task is potentially overwriting existing segments. " @@ -430,7 +417,7 @@ private Set generateAndPushSegments( final BatchAppenderatorDriver driver = newDriver(appenderator, toolbox, segmentAllocator); final Firehose firehose = firehoseFactory.connect(dataSchema.getParser(), firehoseTempDir) ) { - this.appenderator = appenderator; + registerResourceCloserOnAbnormalExit(config -> appenderator.closeNow()); driver.startJob(); final Set pushedSegments = new HashSet<>(); @@ -536,18 +523,4 @@ private static BatchAppenderatorDriver newDriver( toolbox.getDataSegmentKiller() ); } - - @Override - public void stopGracefully(TaskConfig taskConfig) - { - synchronized (this) { - stopped = true; - if (appenderator != null) { - appenderator.closeNow(); - } - if (runThread != null) { - runThread.interrupt(); - } - } - } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index c8cd3d9ddaff..8d266f54cd42 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -38,7 +38,6 @@ import org.apache.druid.indexing.common.actions.LockListAction; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction; -import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; import org.apache.druid.indexing.common.task.AbstractBatchIndexTask; import org.apache.druid.indexing.common.task.IndexTask; @@ -128,8 +127,6 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen private volatile ParallelIndexTaskRunner runner; private volatile IndexTask sequentialIndexTask; - private boolean stopped = false; - // toolbox is initlized when run() is called, and can be used for processing HTTP endpoint requests. private volatile TaskToolbox toolbox; @@ -287,20 +284,7 @@ public Granularity getSegmentGranularity() } @Override - public void stopGracefully(TaskConfig taskConfig) - { - synchronized (this) { - stopped = true; - } - if (runner != null) { - runner.stopGracefully(); - } else if (sequentialIndexTask != null) { - sequentialIndexTask.stopGracefully(taskConfig); - } - } - - @Override - public TaskStatus run(TaskToolbox toolbox) throws Exception + public TaskStatus runTask(TaskToolbox toolbox) throws Exception { if (missingIntervalsInOverwriteMode) { LOG.warn( @@ -357,38 +341,30 @@ void setToolbox(TaskToolbox toolbox) private TaskStatus runParallel(TaskToolbox toolbox) throws Exception { - synchronized (this) { - if (stopped) { - return TaskStatus.failure(getId()); - } - createRunner(toolbox); - } + createRunner(toolbox); + registerResourceCloserOnAbnormalExit(config -> runner.stopGracefully()); return TaskStatus.fromCode(getId(), Preconditions.checkNotNull(runner, "runner").run()); } private TaskStatus runSequential(TaskToolbox toolbox) throws Exception { - synchronized (this) { - if (stopped) { - return TaskStatus.failure(getId()); - } - sequentialIndexTask = new IndexTask( - getId(), - getGroupId(), - getTaskResource(), - getDataSource(), - new IndexIngestionSpec( - getIngestionSchema().getDataSchema(), - getIngestionSchema().getIOConfig(), - convertToIndexTuningConfig(getIngestionSchema().getTuningConfig()) - ), - getContext(), - authorizerMapper, - chatHandlerProvider, - rowIngestionMetersFactory, - appenderatorsManager - ); - } + sequentialIndexTask = new IndexTask( + getId(), + getGroupId(), + getTaskResource(), + getDataSource(), + new IndexIngestionSpec( + getIngestionSchema().getDataSchema(), + getIngestionSchema().getIOConfig(), + convertToIndexTuningConfig(getIngestionSchema().getTuningConfig()) + ), + getContext(), + authorizerMapper, + chatHandlerProvider, + rowIngestionMetersFactory, + appenderatorsManager + ); + registerResourceCloserOnAbnormalExit(config -> sequentialIndexTask.stopGracefully(config)); if (sequentialIndexTask.isReady(toolbox.getTaskActionClient())) { return sequentialIndexTask.run(toolbox); } else { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestTasks.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestTasks.java index 7879d8753569..d501b980d8cb 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestTasks.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestTasks.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.task.AbstractTask; import org.apache.druid.indexing.common.task.Task; @@ -68,6 +69,11 @@ public boolean isReady(TaskActionClient taskActionClient) return true; } + @Override + public void stopGracefully(TaskConfig taskConfig) + { + } + @Override public TaskStatus run(TaskToolbox toolbox) { @@ -96,6 +102,11 @@ public boolean isReady(TaskActionClient taskActionClient) return true; } + @Override + public void stopGracefully(TaskConfig taskConfig) + { + } + @Override public TaskStatus run(TaskToolbox toolbox) throws Exception { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java index 42c864163587..2ec6b1200e88 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java @@ -69,6 +69,11 @@ public boolean isReady(TaskActionClient taskActionClient) return false; } + @Override + public void stopGracefully(TaskConfig taskConfig) + { + } + @Override public boolean requireLockExistingSegments() { @@ -95,7 +100,7 @@ public Granularity getSegmentGranularity() } @Override - public TaskStatus run(TaskToolbox toolbox) + public TaskStatus runTask(TaskToolbox toolbox) { return null; } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RealtimeishTask.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RealtimeishTask.java index 8724ba9c125a..40141412e8ab 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RealtimeishTask.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RealtimeishTask.java @@ -30,6 +30,7 @@ import org.apache.druid.indexing.common.actions.SegmentInsertAction; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.actions.TimeChunkLockAcquireAction; +import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.task.AbstractTask; import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.java.util.common.Intervals; @@ -60,6 +61,11 @@ public boolean isReady(TaskActionClient taskActionClient) return true; } + @Override + public void stopGracefully(TaskConfig taskConfig) + { + } + @Override public TaskStatus run(TaskToolbox toolbox) throws Exception { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index f7dbfdee460a..475b8ef6f68f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -947,6 +947,11 @@ public String getType() return "test"; } + @Override + public void stopGracefully(TaskConfig taskConfig) + { + } + @Override public TaskStatus run(TaskToolbox toolbox) throws Exception { @@ -990,6 +995,11 @@ public String getType() return "test"; } + @Override + public void stopGracefully(TaskConfig taskConfig) + { + } + @Override public TaskStatus run(TaskToolbox toolbox) throws Exception { @@ -1024,6 +1034,11 @@ public String getType() return "test"; } + @Override + public void stopGracefully(TaskConfig taskConfig) + { + } + @Override public TaskStatus run(TaskToolbox toolbox) throws Exception { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java index 004c15b3527e..db0fe75e155e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java @@ -34,6 +34,7 @@ import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TimeChunkLock; import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.config.TaskStorageConfig; import org.apache.druid.indexing.common.task.AbstractTask; import org.apache.druid.indexing.common.task.NoopTask; @@ -1232,6 +1233,11 @@ public boolean isReady(TaskActionClient taskActionClient) return true; } + @Override + public void stopGracefully(TaskConfig taskConfig) + { + } + @Override public TaskStatus run(TaskToolbox toolbox) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java index 3a58c0cf6278..159192562798 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java @@ -31,6 +31,7 @@ import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.task.AbstractTask; import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; @@ -1279,6 +1280,11 @@ public boolean isReady(TaskActionClient taskActionClient) return false; } + @Override + public void stopGracefully(TaskConfig taskConfig) + { + } + @Override public TaskStatus run(TaskToolbox toolbox) { From 77e21282da6c01820d0767bb187da5458eda59f2 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Sat, 3 Aug 2019 13:08:16 -0700 Subject: [PATCH 02/17] kill runner when it's ready --- .../apache/druid/indexing/common/task/HadoopIndexTask.java | 6 ++++-- .../java/org/apache/druid/indexing/common/task/Task.java | 2 +- .../task/batch/parallel/ParallelIndexSupervisorTask.java | 2 +- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java index 8b7f653d5958..86ac882e87d7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java @@ -479,8 +479,10 @@ private void killHadoopJob() String hadoopJobIdFile = getHadoopJobIdFileName(); try { - ClassLoader loader = HadoopTask.buildClassLoader(getHadoopDependencyCoordinates(), - taskConfig.getDefaultHadoopCoordinates()); + ClassLoader loader = HadoopTask.buildClassLoader( + getHadoopDependencyCoordinates(), + taskConfig.getDefaultHadoopCoordinates() + ); Object killMRJobInnerProcessingRunner = getForeignClassloaderObject( "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopKillMRJobIdProcessingRunner", diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java index eac71e4e5f09..a6b7948631cc 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java @@ -165,7 +165,7 @@ default int getPriority() * terminated with extreme prejudice. * * If the task has some resources to clean up on exit, e.g., sub tasks of parallel indexing task - * or Hadoop job of Hadoop indexing task, those resource cleanup should be done in this method. + * or Hadoop job of Hadoop indexing task, those resource cleanups should be done in this method. * * @param taskConfig TaskConfig for this task */ diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 8d266f54cd42..e6ef2c485c9f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -364,8 +364,8 @@ private TaskStatus runSequential(TaskToolbox toolbox) throws Exception rowIngestionMetersFactory, appenderatorsManager ); - registerResourceCloserOnAbnormalExit(config -> sequentialIndexTask.stopGracefully(config)); if (sequentialIndexTask.isReady(toolbox.getTaskActionClient())) { + registerResourceCloserOnAbnormalExit(config -> sequentialIndexTask.stopGracefully(config)); return sequentialIndexTask.run(toolbox); } else { return TaskStatus.failure(getId()); From 31faec2cf83e162c4f246cd1336d474d90d80397 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Sat, 3 Aug 2019 13:26:08 -0700 Subject: [PATCH 03/17] add comment --- .../druid/indexing/common/task/AbstractBatchIndexTask.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java index f1cd3e76d220..6d77bb921674 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java @@ -113,6 +113,13 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception if (stopped) { return TaskStatus.failure(getId()); } else { + // Register the cleaner to interrupt the current thread first. + // Since the resource closer cleans up the registered resources in LIFO order, + // this will be executed last on abnormal exists. + // The order is sometimes important. For example, Appenderator has two methods of close() and closeNow(), and + // closeNow() is supposed to be called on abnormal exits. Interrupting the current thread could lead to close() + // to be called indirectly, e.g., for Appenderators in try-with-resources. In this case, closeNow() should be + // called before the current thread is interrupted, so that subsequent close() calls can be ignored. resourceCloserOnAbnormalExit.register(config -> Thread.currentThread().interrupt()); } } From 13fd9094e05c0eec8721415186d5120a356a572f Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Sat, 3 Aug 2019 14:36:27 -0700 Subject: [PATCH 04/17] kill run thread --- .../druid/indexing/common/task/AbstractBatchIndexTask.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java index 6d77bb921674..313479579189 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java @@ -120,7 +120,8 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception // closeNow() is supposed to be called on abnormal exits. Interrupting the current thread could lead to close() // to be called indirectly, e.g., for Appenderators in try-with-resources. In this case, closeNow() should be // called before the current thread is interrupted, so that subsequent close() calls can be ignored. - resourceCloserOnAbnormalExit.register(config -> Thread.currentThread().interrupt()); + final Thread currentThread = Thread.currentThread(); + resourceCloserOnAbnormalExit.register(config -> currentThread.interrupt()); } } return runTask(toolbox); From befedf625e43da8ff135d21f69a3c0d333c183ea Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Sat, 3 Aug 2019 16:39:03 -0700 Subject: [PATCH 05/17] fix test --- .../parallel/ParallelIndexSupervisorTaskKillTest.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java index 7d90a062a4af..8ddf6cc299fb 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java @@ -40,11 +40,14 @@ import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.hamcrest.CoreMatchers; import org.joda.time.Interval; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import javax.annotation.Nullable; import java.io.IOException; @@ -52,12 +55,16 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.stream.Stream; public class ParallelIndexSupervisorTaskKillTest extends AbstractParallelIndexSupervisorTaskTest { + @Rule + public ExpectedException expectedException = ExpectedException.none(); + private ExecutorService service; @Before @@ -98,7 +105,9 @@ public void testStopGracefully() throws Exception Thread.sleep(100); } task.stopGracefully(null); - Assert.assertEquals(TaskState.FAILED, future.get()); + expectedException.expect(ExecutionException.class); + expectedException.expectCause(CoreMatchers.instanceOf(InterruptedException.class)); + future.get(); final TestParallelIndexTaskRunner runner = (TestParallelIndexTaskRunner) task.getRunner(); Assert.assertTrue(runner.getRunningTaskIds().isEmpty()); From 20a502d4084609337f2fc24b4311b6d29f815819 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Mon, 5 Aug 2019 15:30:38 -0700 Subject: [PATCH 06/17] Take closeable out of Appenderator --- .../AppenderatorDriverRealtimeIndexTask.java | 2 +- .../druid/indexing/common/task/IndexTask.java | 19 ++++++++++++------- .../batch/parallel/ParallelIndexSubTask.java | 9 +++++++-- .../realtime/appenderator/Appenderator.java | 4 +--- ...DefaultOfflineAppenderatorFactoryTest.java | 8 ++++++-- 5 files changed, 27 insertions(+), 15 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index 3c22d2043c4f..564c965723f5 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -437,7 +437,7 @@ public TaskStatus run(final TaskToolbox toolbox) } CloseQuietly.close(firehose); - CloseQuietly.close(appenderator); + appenderator.close(); CloseQuietly.close(driver); toolbox.getMonitorScheduler().removeMonitor(metricsMonitor); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 25689263f459..8662cafda140 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -890,17 +890,16 @@ private TaskStatus generateAndPublishSegments( toolbox.getTaskActionClient() .submit(SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten, segmentsToPublish)); + final Appenderator appenderator = newAppenderator( + buildSegmentsFireDepartmentMetrics, + toolbox, + dataSchema, + tuningConfig + ); try ( - final Appenderator appenderator = newAppenderator( - buildSegmentsFireDepartmentMetrics, - toolbox, - dataSchema, - tuningConfig - ); final BatchAppenderatorDriver driver = newDriver(appenderator, toolbox, segmentAllocator); final Firehose firehose = firehoseFactory.connect(dataSchema.getParser(), firehoseTempDir) ) { - registerResourceCloserOnAbnormalExit(config -> appenderator.closeNow()); driver.startJob(); @@ -961,6 +960,7 @@ private TaskStatus generateAndPublishSegments( handleParseException(e); } } + appenderator.close(); final SegmentsAndMetadata pushed = driver.pushAllAndClear(pushTimeout); log.info("Pushed segments[%s]", pushed.getSegments()); @@ -996,8 +996,13 @@ private TaskStatus generateAndPublishSegments( } } catch (TimeoutException | ExecutionException e) { + appenderator.closeNow(); throw new RuntimeException(e); } + catch (RuntimeException e) { + appenderator.closeNow(); + throw e; + } } private void handleParseException(ParseException e) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java index 69b92edfa184..9e54c547af87 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java @@ -412,12 +412,11 @@ private Set generateAndPushSegments( final boolean explicitIntervals = granularitySpec.bucketIntervals().isPresent(); final SegmentAllocator segmentAllocator = createSegmentAllocator(toolbox, taskClient); + final Appenderator appenderator = newAppenderator(fireDepartmentMetrics, toolbox, dataSchema, tuningConfig); try ( - final Appenderator appenderator = newAppenderator(fireDepartmentMetrics, toolbox, dataSchema, tuningConfig); final BatchAppenderatorDriver driver = newDriver(appenderator, toolbox, segmentAllocator); final Firehose firehose = firehoseFactory.connect(dataSchema.getParser(), firehoseTempDir) ) { - registerResourceCloserOnAbnormalExit(config -> appenderator.closeNow()); driver.startJob(); final Set pushedSegments = new HashSet<>(); @@ -479,6 +478,7 @@ private Set generateAndPushSegments( } } } + appenderator.close(); final SegmentsAndMetadata pushed = driver.pushAllAndClear(pushTimeout); pushedSegments.addAll(pushed.getSegments()); @@ -487,8 +487,13 @@ private Set generateAndPushSegments( return pushedSegments; } catch (TimeoutException | ExecutionException e) { + appenderator.closeNow(); throw new RuntimeException(e); } + catch (RuntimeException e) { + appenderator.closeNow(); + throw e; + } } private Appenderator newAppenderator( diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java index 7ad67ecc8afe..ce5ac8f89d81 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java @@ -29,7 +29,6 @@ import org.apache.druid.segment.incremental.IndexSizeExceededException; import javax.annotation.Nullable; -import java.io.Closeable; import java.util.Collection; import java.util.List; @@ -46,7 +45,7 @@ * all methods of the data appending and indexing lifecycle except {@link #drop} must be called from a single thread. * Methods inherited from {@link QuerySegmentWalker} can be called concurrently from multiple threads. */ -public interface Appenderator extends QuerySegmentWalker, Closeable +public interface Appenderator extends QuerySegmentWalker { /** * Return the name of the dataSource associated with this Appenderator. @@ -200,7 +199,6 @@ ListenableFuture push( * pushes to finish. This will not remove any on-disk persisted data, but it will drop any data that has not yet been * persisted. */ - @Override void close(); /** diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java index b92bd1210422..fed1d705e9c9 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java @@ -154,11 +154,12 @@ public int columnCacheSizeBytes() null ); - try (Appenderator appenderator = defaultOfflineAppenderatorFactory.build( + Appenderator appenderator = defaultOfflineAppenderatorFactory.build( schema, tuningConfig, new FireDepartmentMetrics() - )) { + ); + try { Assert.assertEquals("dataSourceName", appenderator.getDataSource()); Assert.assertEquals(null, appenderator.startJob()); SegmentIdWithShardSpec identifier = new SegmentIdWithShardSpec( @@ -175,5 +176,8 @@ public int columnCacheSizeBytes() appenderator.close(); Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory()); } + finally { + appenderator.close(); + } } } From 6dbbf7138a0daab1cae843d73a81999d80063666 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Mon, 5 Aug 2019 20:39:43 -0700 Subject: [PATCH 07/17] add javadoc --- .../org/apache/druid/indexing/common/task/CompactionTask.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index e40b1eb009d9..4ad9cf6ab43c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -151,6 +151,9 @@ public class CompactionTask extends AbstractBatchIndexTask @JsonIgnore private List indexTaskSpecs; + /** + * This variable is updated by the main thread and read by an HTTP thread when {@link #stopGracefully} is called. + */ @Nullable private volatile IndexTask currentRunningTaskSpec = null; From 4404c2efd42ddd722e6ab6abbd5b702d364bed5e Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 6 Aug 2019 11:44:26 -0700 Subject: [PATCH 08/17] fix test --- .../java/org/apache/druid/indexing/common/task/IndexTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 8662cafda140..56cc62ec0f7b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -960,7 +960,6 @@ private TaskStatus generateAndPublishSegments( handleParseException(e); } } - appenderator.close(); final SegmentsAndMetadata pushed = driver.pushAllAndClear(pushTimeout); log.info("Pushed segments[%s]", pushed.getSegments()); @@ -972,6 +971,7 @@ private TaskStatus generateAndPublishSegments( : null; // Probably we can publish atomicUpdateGroup along with segments. final SegmentsAndMetadata published = awaitPublish(driver.publishAll(inputSegments, publisher), pushTimeout); + appenderator.close(); ingestionState = IngestionState.COMPLETED; if (published == null) { From c13ebeb9a5153f552c295ab348a772a3ac5961b6 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 6 Aug 2019 15:45:26 -0700 Subject: [PATCH 09/17] fix test --- .../common/task/batch/parallel/ParallelIndexSubTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java index 9e54c547af87..8cc56038d08a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java @@ -478,11 +478,11 @@ private Set generateAndPushSegments( } } } - appenderator.close(); final SegmentsAndMetadata pushed = driver.pushAllAndClear(pushTimeout); pushedSegments.addAll(pushed.getSegments()); LOG.info("Pushed segments[%s]", pushed.getSegments()); + appenderator.close(); return pushedSegments; } From a89b3fc26fd3eefca6f29b2412def7a71e165628 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 6 Aug 2019 17:32:11 -0700 Subject: [PATCH 10/17] update javadoc --- .../druid/indexing/common/task/CompactionTask.java | 5 ++++- .../java/org/apache/druid/indexing/common/task/Task.java | 9 ++++++--- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 4ad9cf6ab43c..d8e48e631854 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -152,7 +152,10 @@ public class CompactionTask extends AbstractBatchIndexTask private List indexTaskSpecs; /** - * This variable is updated by the main thread and read by an HTTP thread when {@link #stopGracefully} is called. + * The sub-task that is currently running. + * + * Volatile since it will potentially be accessed by {@link #stopGracefully} concurrently with {@link #runTask}, + * which is responsible for assigning the value. */ @Nullable private volatile IndexTask currentRunningTaskSpec = null; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java index a6b7948631cc..c95a4ae27339 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java @@ -162,12 +162,15 @@ default int getPriority() /** * Asks a task to arrange for its "run" method to exit promptly. Tasks that take too long to stop gracefully will be - * terminated with extreme prejudice. + * terminated with extreme prejudice. Note that this method can be called at any time while {@link #run} is called. + * Its implementations should handle potential concurreny issues properly. * - * If the task has some resources to clean up on exit, e.g., sub tasks of parallel indexing task - * or Hadoop job of Hadoop indexing task, those resource cleanups should be done in this method. + * If the task has some resources to clean up on exit, e.g., sub tasks of parallel indexing task or Hadoop job of + * Hadoop indexing task, those resource cleanups should be done in this method. * * @param taskConfig TaskConfig for this task + * + * @see org.apache.druid.indexing.worker.http.WorkerResource#doShutdown(String) */ void stopGracefully(TaskConfig taskConfig); From 76f75fa9e808790eafb5100a845a4e1f0ae821b7 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 6 Aug 2019 20:43:48 -0700 Subject: [PATCH 11/17] add javadoc about killed task --- .../apache/druid/indexing/common/task/Task.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java index c95a4ae27339..28197e1ed421 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java @@ -162,15 +162,21 @@ default int getPriority() /** * Asks a task to arrange for its "run" method to exit promptly. Tasks that take too long to stop gracefully will be - * terminated with extreme prejudice. Note that this method can be called at any time while {@link #run} is called. - * Its implementations should handle potential concurreny issues properly. + * terminated with extreme prejudice. + * + * This method can be called at any time while {@link #run} is being called when the task is killed. + * Depending on the task executor type, one of the two cases below can happen when the task is killed. + * + * - When the task is executed by a middleManager, {@link org.apache.druid.indexing.overlord.ForkingTaskRunner} kills + * the process running the task, which triggers + * {@link org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner#stop}. + * - When the task is executed by an indexer, {@link org.apache.druid.indexing.overlord.ThreadingTaskRunner#shutdown} + * calls this method directly. * * If the task has some resources to clean up on exit, e.g., sub tasks of parallel indexing task or Hadoop job of * Hadoop indexing task, those resource cleanups should be done in this method. * * @param taskConfig TaskConfig for this task - * - * @see org.apache.druid.indexing.worker.http.WorkerResource#doShutdown(String) */ void stopGracefully(TaskConfig taskConfig); From e43074721b43aa6027b5e686db17349c937a98f5 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 7 Aug 2019 12:24:15 -0700 Subject: [PATCH 12/17] address comment --- .../indexing/common/task/CompactionTask.java | 32 ++++++++++++++----- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index d8e48e631854..b087c7c43e1c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -100,6 +100,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.StreamSupport; @@ -109,6 +110,12 @@ public class CompactionTask extends AbstractBatchIndexTask private static final Logger log = new Logger(CompactionTask.class); private static final String TYPE = "compact"; + /** + * A flag to indicate this task is already stopped and its child indexTasks shouldn't be created. + * See {@link #currentRunningTaskSpec} for more details. + */ + private static final Object SPECIAL_VALUE_STOPPED = new Object(); + private final Interval interval; private final List segments; @Nullable @@ -152,13 +159,16 @@ public class CompactionTask extends AbstractBatchIndexTask private List indexTaskSpecs; /** - * The sub-task that is currently running. + * Reference to the sub-task that is currently running. * - * Volatile since it will potentially be accessed by {@link #stopGracefully} concurrently with {@link #runTask}, - * which is responsible for assigning the value. + * When {@link #stopGracefully} is called, the compaction task gets the reference to the current running task, + * and calls {@link #stopGracefully} for that task. This reference will be updated to {@link #SPECIAL_VALUE_STOPPED}. + * + * Note that {@link #stopGracefully} can be called at any time during {@link #run}. Calling {@link #stopGracefully} + * on the current running task and setting this reference to {@link #SPECIAL_VALUE_STOPPED} should be done atomically. */ @Nullable - private volatile IndexTask currentRunningTaskSpec = null; + private final AtomicReference currentRunningTaskSpec = new AtomicReference<>(); @JsonCreator public CompactionTask( @@ -340,17 +350,23 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception int failCnt = 0; registerResourceCloserOnAbnormalExit(config -> { - if (currentRunningTaskSpec != null) { - currentRunningTaskSpec.stopGracefully(config); + Object currentRunningTask = currentRunningTaskSpec.getAndSet(SPECIAL_VALUE_STOPPED); + if (currentRunningTask != null) { + ((IndexTask) currentRunningTask).stopGracefully(config); } }); for (IndexTask eachSpec : indexTaskSpecs) { + Object prevSpec = currentRunningTaskSpec.get(); + //noinspection ObjectEquality + if (prevSpec == SPECIAL_VALUE_STOPPED || !currentRunningTaskSpec.compareAndSet(prevSpec, eachSpec)) { + log.info("Task is asked to stop. Finish as failed."); + return TaskStatus.failure(getId()); + } final String json = jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(eachSpec); - log.info("Running indexSpec: " + json); try { if (eachSpec.isReady(toolbox.getTaskActionClient())) { - currentRunningTaskSpec = eachSpec; + log.info("Running indexSpec: " + json); final TaskStatus eachResult = eachSpec.run(toolbox); if (!eachResult.isSuccess()) { failCnt++; From cff9465c4ee0d38f6ccc466f7ee0cd841afe9caa Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 8 Aug 2019 13:40:37 -0700 Subject: [PATCH 13/17] handling missing exceptions --- .../druid/indexing/common/task/IndexTask.java | 14 +++++++++++--- .../task/batch/parallel/ParallelIndexSubTask.java | 14 +++++++++++--- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 56cc62ec0f7b..c314cef7f844 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -896,6 +896,7 @@ private TaskStatus generateAndPublishSegments( dataSchema, tuningConfig ); + boolean exceptionOccurred = false; try ( final BatchAppenderatorDriver driver = newDriver(appenderator, toolbox, segmentAllocator); final Firehose firehose = firehoseFactory.connect(dataSchema.getParser(), firehoseTempDir) @@ -996,13 +997,20 @@ private TaskStatus generateAndPublishSegments( } } catch (TimeoutException | ExecutionException e) { - appenderator.closeNow(); + exceptionOccurred = true; throw new RuntimeException(e); } - catch (RuntimeException e) { - appenderator.closeNow(); + catch (Exception e) { + exceptionOccurred = true; throw e; } + finally { + if (exceptionOccurred) { + appenderator.closeNow(); + } else { + appenderator.close(); + } + } } private void handleParseException(ParseException e) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java index 8cc56038d08a..3e3cf80bc180 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java @@ -413,6 +413,7 @@ private Set generateAndPushSegments( final SegmentAllocator segmentAllocator = createSegmentAllocator(toolbox, taskClient); final Appenderator appenderator = newAppenderator(fireDepartmentMetrics, toolbox, dataSchema, tuningConfig); + boolean exceptionOccurred = false; try ( final BatchAppenderatorDriver driver = newDriver(appenderator, toolbox, segmentAllocator); final Firehose firehose = firehoseFactory.connect(dataSchema.getParser(), firehoseTempDir) @@ -487,13 +488,20 @@ private Set generateAndPushSegments( return pushedSegments; } catch (TimeoutException | ExecutionException e) { - appenderator.closeNow(); + exceptionOccurred = true; throw new RuntimeException(e); } - catch (RuntimeException e) { - appenderator.closeNow(); + catch (Exception e) { + exceptionOccurred = true; throw e; } + finally { + if (exceptionOccurred) { + appenderator.closeNow(); + } else { + appenderator.close(); + } + } } private Appenderator newAppenderator( From 2e19f5bd365bf297d7e6b604935efa97b5624605 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 8 Aug 2019 14:26:17 -0700 Subject: [PATCH 14/17] more clear javadoc for stopGracefully --- .../java/org/apache/druid/indexing/common/task/Task.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java index 28197e1ed421..33cb47c2745d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java @@ -164,7 +164,10 @@ default int getPriority() * Asks a task to arrange for its "run" method to exit promptly. Tasks that take too long to stop gracefully will be * terminated with extreme prejudice. * - * This method can be called at any time while {@link #run} is being called when the task is killed. + * This method can be called at any time while {@link #run} is being called when the task is killed. If this task + * is not started yet, that is {@link #run} is not called yet, this method will be never called. + * Once this task is started, this method can be called even after {@link #run} returns. Implementations of this + * method may want to avoid unnecessary work if {@link #run} already returned. * Depending on the task executor type, one of the two cases below can happen when the task is killed. * * - When the task is executed by a middleManager, {@link org.apache.druid.indexing.overlord.ForkingTaskRunner} kills @@ -173,8 +176,8 @@ default int getPriority() * - When the task is executed by an indexer, {@link org.apache.druid.indexing.overlord.ThreadingTaskRunner#shutdown} * calls this method directly. * - * If the task has some resources to clean up on exit, e.g., sub tasks of parallel indexing task or Hadoop job of - * Hadoop indexing task, those resource cleanups should be done in this method. + * If the task has some resources to clean up on abnormal exit, e.g., sub tasks of parallel indexing task + * or Hadoop jobs spawned by Hadoop indexing tasks, those resource cleanups should be done in this method. * * @param taskConfig TaskConfig for this task */ From 675299422467ab853fa4ffb3e949b4fafba26749 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Mon, 12 Aug 2019 13:21:00 -0700 Subject: [PATCH 15/17] update javadoc --- .../java/org/apache/druid/indexing/common/task/Task.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java index 33cb47c2745d..5dc8ee6e3b66 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java @@ -164,12 +164,10 @@ default int getPriority() * Asks a task to arrange for its "run" method to exit promptly. Tasks that take too long to stop gracefully will be * terminated with extreme prejudice. * - * This method can be called at any time while {@link #run} is being called when the task is killed. If this task - * is not started yet, that is {@link #run} is not called yet, this method will be never called. - * Once this task is started, this method can be called even after {@link #run} returns. Implementations of this - * method may want to avoid unnecessary work if {@link #run} already returned. - * Depending on the task executor type, one of the two cases below can happen when the task is killed. + * Regardless when this method is called with respect to {@link #run}, its implementations must not allow a resource + * leak or lingering executions (local or remote). * + * Depending on the task executor type, one of the two cases below can happen when the task is killed. * - When the task is executed by a middleManager, {@link org.apache.druid.indexing.overlord.ForkingTaskRunner} kills * the process running the task, which triggers * {@link org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner#stop}. From 97c0f1228fe1f2d22be95c1d2d330ccebd5f7e67 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Mon, 12 Aug 2019 13:33:26 -0700 Subject: [PATCH 16/17] Add missing statement in javadoc --- .../java/org/apache/druid/indexing/common/task/Task.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java index 5dc8ee6e3b66..097bf7474ba8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java @@ -164,8 +164,9 @@ default int getPriority() * Asks a task to arrange for its "run" method to exit promptly. Tasks that take too long to stop gracefully will be * terminated with extreme prejudice. * - * Regardless when this method is called with respect to {@link #run}, its implementations must not allow a resource - * leak or lingering executions (local or remote). + * This method can be called at any time no matter when {@link #run} is executed. Regardless of when this method is + * called with respect to {@link #run}, its implementations must not allow a resource leak or lingering executions + * (local or remote). * * Depending on the task executor type, one of the two cases below can happen when the task is killed. * - When the task is executed by a middleManager, {@link org.apache.druid.indexing.overlord.ForkingTaskRunner} kills From d3db7a6edf927c3726772daf0050225813ec0bf8 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Mon, 12 Aug 2019 14:51:26 -0700 Subject: [PATCH 17/17] typo --- .../druid/indexing/common/task/AbstractBatchIndexTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java index 313479579189..56dccbe83515 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java @@ -100,7 +100,7 @@ protected AbstractBatchIndexTask( } /** - * Run this task. Before running the task, ithecks the the current task is already stopped and + * Run this task. Before running the task, it checks the the current task is already stopped and * registers a cleaner to interrupt the thread running this task on abnormal exits. * * @see #runTask(TaskToolbox)