Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/content/configuration/indexing-service.md
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ Additional peon configs include:
|`druid.indexer.task.hadoopWorkingPath`|Temporary working directory for Hadoop tasks.|/tmp/druid-indexing|
|`druid.indexer.task.defaultRowFlushBoundary`|Highest row count before persisting to disk. Used for indexing generating tasks.|50000|
|`druid.indexer.task.defaultHadoopCoordinates`|Hadoop version to use with HadoopIndexTasks that do not request a particular version.|org.apache.hadoop:hadoop-client:2.3.0|
|`druid.indexer.task.restoreTasksOnRestart`|If true, middleManagers will attempt to stop tasks gracefully on shutdown and restore them on restart.|false|
|`druid.indexer.task.gracefulShutdownTimeout`|Wait this long on middleManager restart for restorable tasks to gracefully exit.|PT5M|
|`druid.indexer.task.directoryLockTimeout`|Wait this long for zombie peons to exit before giving up on their replacements.|PT10M|

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ public class TaskConfig
@JsonProperty
private final List<String> defaultHadoopCoordinates;

@JsonProperty
private final boolean restoreTasksOnRestart;

@JsonProperty
private final Period gracefulShutdownTimeout;

Expand All @@ -65,6 +68,7 @@ public TaskConfig(
@JsonProperty("hadoopWorkingPath") String hadoopWorkingPath,
@JsonProperty("defaultRowFlushBoundary") Integer defaultRowFlushBoundary,
@JsonProperty("defaultHadoopCoordinates") List<String> defaultHadoopCoordinates,
@JsonProperty("restoreTasksOnRestart") boolean restoreTasksOnRestart,
@JsonProperty("gracefulShutdownTimeout") Period gracefulShutdownTimeout,
@JsonProperty("directoryLockTimeout") Period directoryLockTimeout
)
Expand All @@ -76,6 +80,7 @@ public TaskConfig(
this.defaultHadoopCoordinates = defaultHadoopCoordinates == null
? DEFAULT_DEFAULT_HADOOP_COORDINATES
: defaultHadoopCoordinates;
this.restoreTasksOnRestart = restoreTasksOnRestart;
this.gracefulShutdownTimeout = gracefulShutdownTimeout == null
? DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT
: gracefulShutdownTimeout;
Expand Down Expand Up @@ -129,6 +134,12 @@ public List<String> getDefaultHadoopCoordinates()
return defaultHadoopCoordinates;
}

@JsonProperty
public boolean isRestoreTasksOnRestart()
{
return restoreTasksOnRestart;
}

@JsonProperty
public Period getGracefulShutdownTimeout()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
throw new ISE("WTF?! Task[%s] restore file had wrong id[%s].", taskId, task.getId());
}

if (task.canRestore()) {
if (taskConfig.isRestoreTasksOnRestart() && task.canRestore()) {
log.info("Restoring task[%s].", task.getId());
retVal.add(Pair.of(task, run(task)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void stop()
final long elapsed;
boolean error = false;

if (task.canRestore()) {
if (taskConfig.isRestoreTasksOnRestart() && task.canRestore()) {
// Attempt graceful shutdown.
graceful = true;
log.info("Starting graceful shutdown of task[%s].", task.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void setUp() throws IOException
EasyMock.replay(task, mockHandoffNotifierFactory);

taskToolbox = new TaskToolboxFactory(
new TaskConfig(temporaryFolder.newFile().toString(), null, null, 50000, null, null, null),
new TaskConfig(temporaryFolder.newFile().toString(), null, null, 50000, null, false, null, null),
mockTaskActionClientFactory,
mockEmitter,
mockSegmentPusher,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ private RealtimeIndexTask makeRealtimeTask(final String taskId)
private TaskToolbox makeToolbox(final Task task, final IndexerMetadataStorageCoordinator mdc, final File directory)
{
final TaskStorage taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null));
final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, null, null);
final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, false, null, null);
final TaskLockbox taskLockbox = new TaskLockbox(taskStorage);
final TaskActionToolbox taskActionToolbox = new TaskActionToolbox(
taskLockbox,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public void deleteSegments(Set<DataSegment> segments)
EasyMock.replay(notifierFactory);

final TaskToolboxFactory taskToolboxFactory = new TaskToolboxFactory(
new TaskConfig(tmpDir.getAbsolutePath(), null, null, 50000, null, null, null),
new TaskConfig(tmpDir.getAbsolutePath(), null, null, 50000, null, false, null, null),
tac,
newMockEmitter(),
new DataSegmentPusher()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ public <RetType> RetType submit(TaskAction<RetType> taskAction) throws IOExcepti
SegmentHandoffNotifierFactory notifierFactory = EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class);
EasyMock.replay(notifierFactory);
final TaskToolboxFactory taskToolboxFactory = new TaskToolboxFactory(
new TaskConfig(testCase.tmpDir.getAbsolutePath(), null, null, 50000, null, null, null),
new TaskConfig(testCase.tmpDir.getAbsolutePath(), null, null, 50000, null, false, null, null),
new TaskActionClientFactory()
{
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ public DataSegment push(File file, DataSegment segment) throws IOException

private void setUpAndStartTaskQueue(DataSegmentPusher dataSegmentPusher)
{
final TaskConfig taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, null, null);
final TaskConfig taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, false, null, null);
tsqa = new TaskStorageQueryAdapter(ts);
tl = new TaskLockbox(ts);
mdc = newMockMDC();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,16 @@ public String getBase()

private WorkerTaskMonitor createTaskMonitor()
{
final TaskConfig taskConfig = new TaskConfig(Files.createTempDir().toString(), null, null, 0, null, null, null);
final TaskConfig taskConfig = new TaskConfig(
Files.createTempDir().toString(),
null,
null,
0,
null,
false,
null,
null
);
TaskActionClientFactory taskActionClientFactory = EasyMock.createNiceMock(TaskActionClientFactory.class);
TaskActionClient taskActionClient = EasyMock.createNiceMock(TaskActionClient.class);
EasyMock.expect(taskActionClientFactory.create(EasyMock.<Task>anyObject())).andReturn(taskActionClient).anyTimes();
Expand Down