diff --git a/docs/content/configuration/indexing-service.md b/docs/content/configuration/indexing-service.md index d6747d62b46b..e69f7f3d643d 100644 --- a/docs/content/configuration/indexing-service.md +++ b/docs/content/configuration/indexing-service.md @@ -279,6 +279,7 @@ Additional peon configs include: |`druid.indexer.task.hadoopWorkingPath`|Temporary working directory for Hadoop tasks.|/tmp/druid-indexing| |`druid.indexer.task.defaultRowFlushBoundary`|Highest row count before persisting to disk. Used for indexing generating tasks.|50000| |`druid.indexer.task.defaultHadoopCoordinates`|Hadoop version to use with HadoopIndexTasks that do not request a particular version.|org.apache.hadoop:hadoop-client:2.3.0| +|`druid.indexer.task.restoreTasksOnRestart`|If true, middleManagers will attempt to stop tasks gracefully on shutdown and restore them on restart.|false| |`druid.indexer.task.gracefulShutdownTimeout`|Wait this long on middleManager restart for restorable tasks to gracefully exit.|PT5M| |`druid.indexer.task.directoryLockTimeout`|Wait this long for zombie peons to exit before giving up on their replacements.|PT10M| diff --git a/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java b/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java index 06f7c00d68cc..fefc69039224 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java @@ -52,6 +52,9 @@ public class TaskConfig @JsonProperty private final List defaultHadoopCoordinates; + @JsonProperty + private final boolean restoreTasksOnRestart; + @JsonProperty private final Period gracefulShutdownTimeout; @@ -65,6 +68,7 @@ public TaskConfig( @JsonProperty("hadoopWorkingPath") String hadoopWorkingPath, @JsonProperty("defaultRowFlushBoundary") Integer defaultRowFlushBoundary, @JsonProperty("defaultHadoopCoordinates") List defaultHadoopCoordinates, + @JsonProperty("restoreTasksOnRestart") boolean restoreTasksOnRestart, @JsonProperty("gracefulShutdownTimeout") Period gracefulShutdownTimeout, @JsonProperty("directoryLockTimeout") Period directoryLockTimeout ) @@ -76,6 +80,7 @@ public TaskConfig( this.defaultHadoopCoordinates = defaultHadoopCoordinates == null ? DEFAULT_DEFAULT_HADOOP_COORDINATES : defaultHadoopCoordinates; + this.restoreTasksOnRestart = restoreTasksOnRestart; this.gracefulShutdownTimeout = gracefulShutdownTimeout == null ? DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT : gracefulShutdownTimeout; @@ -129,6 +134,12 @@ public List getDefaultHadoopCoordinates() return defaultHadoopCoordinates; } + @JsonProperty + public boolean isRestoreTasksOnRestart() + { + return restoreTasksOnRestart; + } + @JsonProperty public Period getGracefulShutdownTimeout() { diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java index 6a2042bb2911..83afb91b0292 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java @@ -149,7 +149,7 @@ public List>> restore() throw new ISE("WTF?! Task[%s] restore file had wrong id[%s].", taskId, task.getId()); } - if (task.canRestore()) { + if (taskConfig.isRestoreTasksOnRestart() && task.canRestore()) { log.info("Restoring task[%s].", task.getId()); retVal.add(Pair.of(task, run(task))); } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java index 92a5843217bb..97d3961cef93 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java @@ -101,7 +101,7 @@ public void stop() final long elapsed; boolean error = false; - if (task.canRestore()) { + if (taskConfig.isRestoreTasksOnRestart() && task.canRestore()) { // Attempt graceful shutdown. graceful = true; log.info("Starting graceful shutdown of task[%s].", task.getId()); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java b/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java index 75e163713785..5a6da2021264 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java @@ -92,7 +92,7 @@ public void setUp() throws IOException EasyMock.replay(task, mockHandoffNotifierFactory); taskToolbox = new TaskToolboxFactory( - new TaskConfig(temporaryFolder.newFile().toString(), null, null, 50000, null, null, null), + new TaskConfig(temporaryFolder.newFile().toString(), null, null, 50000, null, false, null, null), mockTaskActionClientFactory, mockEmitter, mockSegmentPusher, diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java index 71ab5107d46f..f1b54df3edbc 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -455,7 +455,7 @@ private RealtimeIndexTask makeRealtimeTask(final String taskId) private TaskToolbox makeToolbox(final Task task, final IndexerMetadataStorageCoordinator mdc, final File directory) { final TaskStorage taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null)); - final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, null, null); + final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, false, null, null); final TaskLockbox taskLockbox = new TaskLockbox(taskStorage); final TaskActionToolbox taskActionToolbox = new TaskActionToolbox( taskLockbox, diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index 53f409af36a9..0210067b170c 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -204,7 +204,7 @@ public void deleteSegments(Set segments) EasyMock.replay(notifierFactory); final TaskToolboxFactory taskToolboxFactory = new TaskToolboxFactory( - new TaskConfig(tmpDir.getAbsolutePath(), null, null, 50000, null, null, null), + new TaskConfig(tmpDir.getAbsolutePath(), null, null, 50000, null, false, null, null), tac, newMockEmitter(), new DataSegmentPusher() diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java index 23c391719cfe..7b28bd90a544 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java @@ -298,7 +298,7 @@ public RetType submit(TaskAction taskAction) throws IOExcepti SegmentHandoffNotifierFactory notifierFactory = EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class); EasyMock.replay(notifierFactory); final TaskToolboxFactory taskToolboxFactory = new TaskToolboxFactory( - new TaskConfig(testCase.tmpDir.getAbsolutePath(), null, null, 50000, null, null, null), + new TaskConfig(testCase.tmpDir.getAbsolutePath(), null, null, 50000, null, false, null, null), new TaskActionClientFactory() { @Override diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 26c3d168c7ee..fbed169867b1 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -451,7 +451,7 @@ public DataSegment push(File file, DataSegment segment) throws IOException private void setUpAndStartTaskQueue(DataSegmentPusher dataSegmentPusher) { - final TaskConfig taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, null, null); + final TaskConfig taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, false, null, null); tsqa = new TaskStorageQueryAdapter(ts); tl = new TaskLockbox(ts); mdc = newMockMDC(); diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java index aa29337f4e45..1b1ccbbf71b7 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -141,7 +141,16 @@ public String getBase() private WorkerTaskMonitor createTaskMonitor() { - final TaskConfig taskConfig = new TaskConfig(Files.createTempDir().toString(), null, null, 0, null, null, null); + final TaskConfig taskConfig = new TaskConfig( + Files.createTempDir().toString(), + null, + null, + 0, + null, + false, + null, + null + ); TaskActionClientFactory taskActionClientFactory = EasyMock.createNiceMock(TaskActionClientFactory.class); TaskActionClient taskActionClient = EasyMock.createNiceMock(TaskActionClient.class); EasyMock.expect(taskActionClientFactory.create(EasyMock.anyObject())).andReturn(taskActionClient).anyTimes();