Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,13 @@ private static String makeDatasource(FireDepartment fireDepartment)
private volatile Firehose firehose = null;

@JsonIgnore
private volatile boolean stopped = false;
private volatile boolean gracefullyStopped = false;

@JsonIgnore
private volatile boolean finishingJob = false;

@JsonIgnore
private volatile Thread runThread = null;

@JsonIgnore
private volatile QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate = null;
Expand Down Expand Up @@ -172,16 +178,12 @@ public boolean isReady(TaskActionClient taskActionClient) throws Exception
@Override
public TaskStatus run(final TaskToolbox toolbox) throws Exception
{
runThread = Thread.currentThread();

if (this.plumber != null) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering if its better to not release locks only if task is restorable ?
will it be better to always release old locks if there are any in case the task is not restorable (just to be on safer side)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there is a reason to release locks here since,

  • we won't have any when we first start up (since a while ago, tasks don't get retried if they fail)
  • after a second startup (restore) we really do want to keep the locks as dropping them will confuse us

throw new IllegalStateException("WTF?!? run with non-null plumber??!");
}

// Shed any locks we might have (e.g. if we were uncleanly killed and restarted) since we'll reacquire
// them if we actually need them
for (final TaskLock taskLock : getTaskLocks(toolbox)) {
toolbox.getTaskActionClient().submit(new LockReleaseAction(taskLock.getInterval()));
}

boolean normalExit = true;

// It would be nice to get the PlumberSchool in the constructor. Although that will need jackson injectables for
Expand Down Expand Up @@ -313,7 +315,7 @@ public String getVersion(final Interval interval)
committerSupplier = Committers.supplierFromFirehose(firehose);

// Time to read data!
while ((!stopped || firehoseDrainableByClosing) && firehose.hasMore()) {
while ((!gracefullyStopped || firehoseDrainableByClosing) && firehose.hasMore()) {
final InputRow inputRow;

try {
Expand Down Expand Up @@ -350,39 +352,55 @@ public String getVersion(final Interval interval)
finally {
if (normalExit) {
try {
if (!stopped) {
// Hand off all pending data
log.info("Persisting and handing off pending data.");
plumber.persist(committerSupplier.get());
plumber.finishJob();
} else {
log.info("Persisting pending data without handoff, in preparation for restart.");
final Committer committer = committerSupplier.get();
final CountDownLatch persistLatch = new CountDownLatch(1);
plumber.persist(
new Committer()
// Always want to persist.
log.info("Persisting remaining data.");

final Committer committer = committerSupplier.get();
final CountDownLatch persistLatch = new CountDownLatch(1);
plumber.persist(
new Committer()
{
@Override
public Object getMetadata()
{
@Override
public Object getMetadata()
{
return committer.getMetadata();
}
return committer.getMetadata();
}

@Override
public void run()
{
try {
committer.run();
}
finally {
persistLatch.countDown();
}
@Override
public void run()
{
try {
committer.run();
}
finally {
persistLatch.countDown();
}
}
);
persistLatch.await();
}
);
persistLatch.await();

if (gracefullyStopped) {
log.info("Gracefully stopping.");
} else {
log.info("Finishing the job.");
synchronized (this) {
if (gracefullyStopped) {
// Someone called stopGracefully after we checked the flag. That's okay, just stop now.
log.info("Gracefully stopping.");
} else {
finishingJob = true;
}
}

if (finishingJob) {
plumber.finishJob();
}
}
}
catch (InterruptedException e) {
log.debug(e, "Interrupted while finishing the job");
}
catch (Exception e) {
log.makeAlert(e, "Failed to finish realtime task").emit();
throw e;
Expand Down Expand Up @@ -410,13 +428,17 @@ public void stopGracefully()
{
try {
synchronized (this) {
if (!stopped) {
stopped = true;
log.info("Gracefully stopping.");
if (isFirehoseDrainableByClosing(spec.getIOConfig().getFirehoseFactory())) {
if (!gracefullyStopped) {
gracefullyStopped = true;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happened to the log line?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It split in two and moved into the conditional below.

if (finishingJob) {
log.info("stopGracefully: Interrupting finishJob.");
runThread.interrupt();
} else if (isFirehoseDrainableByClosing(spec.getIOConfig().getFirehoseFactory())) {
log.info("stopGracefully: Draining firehose.");
firehose.close();
} else {
log.debug("Cannot drain firehose[%s] by closing, so skipping closing.", firehose);
log.info("stopGracefully: Cannot drain firehose by closing, interrupting run thread.");
runThread.interrupt();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,10 @@ public void syncFromStorage()

try {
// Load stuff from taskStorage first. If this fails, we don't want to lose all our locks.
final Set<String> storedActiveTasks = Sets.newHashSet();
final List<Pair<Task, TaskLock>> storedLocks = Lists.newArrayList();
for (final Task task : taskStorage.getActiveTasks()) {
storedActiveTasks.add(task.getId());
for (final TaskLock taskLock : taskStorage.getLocks(task.getId())) {
storedLocks.add(Pair.of(task, taskLock));
}
Expand All @@ -111,6 +113,7 @@ public int compare(Pair<Task, TaskLock> left, Pair<Task, TaskLock> right)
};
running.clear();
activeTasks.clear();
activeTasks.addAll(storedActiveTasks);
// Bookkeeping for a log message at the end
int taskLockCount = 0;
for (final Pair<Task, TaskLock> taskAndLock : byVersionOrdering.sortedCopy(storedLocks)) {
Expand All @@ -121,7 +124,6 @@ public int compare(Pair<Task, TaskLock> left, Pair<Task, TaskLock> right)
log.warn("WTF?! Got lock with empty interval for task: %s", task.getId());
continue;
}
activeTasks.add(task.getId());
final Optional<TaskLock> acquiredTaskLock = tryLock(
task,
savedTaskLock.getInterval(),
Expand Down
Loading