Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/content/configuration/indexing-service.md
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ Additional peon configs include:
|`druid.indexer.task.hadoopWorkingPath`|Temporary working directory for Hadoop tasks.|/tmp/druid-indexing|
|`druid.indexer.task.defaultRowFlushBoundary`|Highest row count before persisting to disk. Used for indexing generating tasks.|50000|
|`druid.indexer.task.defaultHadoopCoordinates`|Hadoop version to use with HadoopIndexTasks that do not request a particular version.|org.apache.hadoop:hadoop-client:2.3.0|
|`druid.indexer.task.restoreTasksOnRestart`|If true, middleManagers will attempt to stop tasks gracefully on shutdown and restore them on restart.|false|
|`druid.indexer.task.gracefulShutdownTimeout`|Wait this long on middleManager restart for restorable tasks to gracefully exit.|PT5M|
|`druid.indexer.task.directoryLockTimeout`|Wait this long for zombie peons to exit before giving up on their replacements.|PT10M|

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ public class TaskConfig
@JsonProperty
private final List<String> defaultHadoopCoordinates;

@JsonProperty
private final boolean restoreTasksOnRestart;

@JsonProperty
private final Period gracefulShutdownTimeout;

Expand All @@ -63,6 +66,7 @@ public TaskConfig(
@JsonProperty("hadoopWorkingPath") String hadoopWorkingPath,
@JsonProperty("defaultRowFlushBoundary") Integer defaultRowFlushBoundary,
@JsonProperty("defaultHadoopCoordinates") List<String> defaultHadoopCoordinates,
@JsonProperty("restoreTasksOnRestart") boolean restoreTasksOnRestart,
@JsonProperty("gracefulShutdownTimeout") Period gracefulShutdownTimeout,
@JsonProperty("directoryLockTimeout") Period directoryLockTimeout
)
Expand All @@ -74,6 +78,7 @@ public TaskConfig(
this.defaultHadoopCoordinates = defaultHadoopCoordinates == null
? DEFAULT_DEFAULT_HADOOP_COORDINATES
: defaultHadoopCoordinates;
this.restoreTasksOnRestart = restoreTasksOnRestart;
this.gracefulShutdownTimeout = gracefulShutdownTimeout == null
? DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT
: gracefulShutdownTimeout;
Expand Down Expand Up @@ -127,6 +132,12 @@ public List<String> getDefaultHadoopCoordinates()
return defaultHadoopCoordinates;
}

@JsonProperty
public boolean isRestoreTasksOnRestart()
{
return restoreTasksOnRestart;
}

@JsonProperty
public Period getGracefulShutdownTimeout()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,13 @@ private static String makeDatasource(FireDepartment fireDepartment)
private volatile Firehose firehose = null;

@JsonIgnore
private volatile boolean stopped = false;
private volatile boolean gracefullyStopped = false;

@JsonIgnore
private volatile boolean finishingJob = false;

@JsonIgnore
private volatile Thread runThread = null;

@JsonIgnore
private volatile QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate = null;
Expand Down Expand Up @@ -170,16 +176,12 @@ public boolean isReady(TaskActionClient taskActionClient) throws Exception
@Override
public TaskStatus run(final TaskToolbox toolbox) throws Exception
{
runThread = Thread.currentThread();

if (this.plumber != null) {
throw new IllegalStateException("WTF?!? run with non-null plumber??!");
}

// Shed any locks we might have (e.g. if we were uncleanly killed and restarted) since we'll reacquire
// them if we actually need them
for (final TaskLock taskLock : getTaskLocks(toolbox)) {
toolbox.getTaskActionClient().submit(new LockReleaseAction(taskLock.getInterval()));
}

boolean normalExit = true;

// It would be nice to get the PlumberSchool in the constructor. Although that will need jackson injectables for
Expand Down Expand Up @@ -309,7 +311,7 @@ public String getVersion(final Interval interval)
committerSupplier = Committers.supplierFromFirehose(firehose);

// Time to read data!
while ((!stopped || firehoseDrainableByClosing) && firehose.hasMore()) {
while ((!gracefullyStopped || firehoseDrainableByClosing) && firehose.hasMore()) {
final InputRow inputRow;

try {
Expand Down Expand Up @@ -346,39 +348,55 @@ public String getVersion(final Interval interval)
finally {
if (normalExit) {
try {
if (!stopped) {
// Hand off all pending data
log.info("Persisting and handing off pending data.");
plumber.persist(committerSupplier.get());
plumber.finishJob();
} else {
log.info("Persisting pending data without handoff, in preparation for restart.");
final Committer committer = committerSupplier.get();
final CountDownLatch persistLatch = new CountDownLatch(1);
plumber.persist(
new Committer()
// Always want to persist.
log.info("Persisting remaining data.");

final Committer committer = committerSupplier.get();
final CountDownLatch persistLatch = new CountDownLatch(1);
plumber.persist(
new Committer()
{
@Override
public Object getMetadata()
{
@Override
public Object getMetadata()
{
return committer.getMetadata();
}
return committer.getMetadata();
}

@Override
public void run()
{
try {
committer.run();
}
finally {
persistLatch.countDown();
}
@Override
public void run()
{
try {
committer.run();
}
finally {
persistLatch.countDown();
}
}
);
persistLatch.await();
}
);
persistLatch.await();

if (gracefullyStopped) {
log.info("Gracefully stopping.");
} else {
log.info("Finishing the job.");
synchronized (this) {
if (gracefullyStopped) {
// Someone called stopGracefully after we checked the flag. That's okay, just stop now.
log.info("Gracefully stopping.");
} else {
finishingJob = true;
}
}

if (finishingJob) {
plumber.finishJob();
}
}
}
catch (InterruptedException e) {
log.debug(e, "Interrupted while finishing the job");
}
catch (Exception e) {
log.makeAlert(e, "Failed to finish realtime task").emit();
throw e;
Expand Down Expand Up @@ -406,13 +424,17 @@ public void stopGracefully()
{
try {
synchronized (this) {
if (!stopped) {
stopped = true;
log.info("Gracefully stopping.");
if (isFirehoseDrainableByClosing(spec.getIOConfig().getFirehoseFactory())) {
if (!gracefullyStopped) {
gracefullyStopped = true;
if (finishingJob) {
log.info("stopGracefully: Interrupting finishJob.");
runThread.interrupt();
} else if (isFirehoseDrainableByClosing(spec.getIOConfig().getFirehoseFactory())) {
log.info("stopGracefully: Draining firehose.");
firehose.close();
} else {
log.debug("Cannot drain firehose[%s] by closing, so skipping closing.", firehose);
log.info("stopGracefully: Cannot drain firehose by closing, interrupting run thread.");
runThread.interrupt();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
throw new ISE("WTF?! Task[%s] restore file had wrong id[%s].", taskId, task.getId());
}

if (task.canRestore()) {
if (taskConfig.isRestoreTasksOnRestart() && task.canRestore()) {
log.info("Restoring task[%s].", task.getId());
retVal.add(Pair.of(task, run(task)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,10 @@ public void syncFromStorage()

try {
// Load stuff from taskStorage first. If this fails, we don't want to lose all our locks.
final Set<String> storedActiveTasks = Sets.newHashSet();
final List<Pair<Task, TaskLock>> storedLocks = Lists.newArrayList();
for (final Task task : taskStorage.getActiveTasks()) {
storedActiveTasks.add(task.getId());
for (final TaskLock taskLock : taskStorage.getLocks(task.getId())) {
storedLocks.add(Pair.of(task, taskLock));
}
Expand All @@ -108,6 +110,7 @@ public int compare(Pair<Task, TaskLock> left, Pair<Task, TaskLock> right)
};
running.clear();
activeTasks.clear();
activeTasks.addAll(storedActiveTasks);
// Bookkeeping for a log message at the end
int taskLockCount = 0;
for (final Pair<Task, TaskLock> taskAndLock : byVersionOrdering.sortedCopy(storedLocks)) {
Expand All @@ -118,7 +121,6 @@ public int compare(Pair<Task, TaskLock> left, Pair<Task, TaskLock> right)
log.warn("WTF?! Got lock with empty interval for task: %s", task.getId());
continue;
}
activeTasks.add(task.getId());
final Optional<TaskLock> acquiredTaskLock = tryLock(
task,
savedTaskLock.getInterval(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void stop()
final long elapsed;
boolean error = false;

if (task.canRestore()) {
if (taskConfig.isRestoreTasksOnRestart() && task.canRestore()) {
// Attempt graceful shutdown.
graceful = true;
log.info("Starting graceful shutdown of task[%s].", task.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void setUp() throws IOException
EasyMock.replay(task, mockHandoffNotifierFactory);

taskToolbox = new TaskToolboxFactory(
new TaskConfig(temporaryFolder.newFile().toString(), null, null, 50000, null, null, null),
new TaskConfig(temporaryFolder.newFile().toString(), null, null, 50000, null, false, null, null),
mockTaskActionClientFactory,
mockEmitter,
mockSegmentPusher,
Expand Down
Loading