Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -969,10 +969,11 @@ private StageOutputHolder getOrCreateStageOutputHolder(final StageId stageId, fi

/**
* Returns cancellation ID for a particular stage, to be used in {@link FrameProcessorExecutor#cancel(String)}.
* In addition to being a token for cancellation, this also appears in thread dumps, so make it a little descriptive.
*/
private static String cancellationIdFor(final StageId stageId, final int workerNumber)
{
return StringUtils.format("%s_%s", stageId, workerNumber);
return StringUtils.format("msq-worker[%s_%s]", stageId, workerNumber);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ private Optional<ReturnOrAwait<T>> runProcessorNow()
}
}

final String threadName = Thread.currentThread().getName();
boolean canceled = false;
Either<Throwable, ReturnOrAwait<T>> retVal;

Expand All @@ -230,6 +231,11 @@ private Optional<ReturnOrAwait<T>> runProcessorNow()
throw new InterruptedException();
}

if (cancellationId != null) {
// Set the thread name to something involving the cancellationId, to make thread dumps more useful.
Thread.currentThread().setName(threadName + "-" + cancellationId);
}

retVal = Either.value(processor.runIncrementally(readableInputs));
}
catch (Throwable e) {
Expand All @@ -253,6 +259,9 @@ private Optional<ReturnOrAwait<T>> runProcessorNow()
canceled = true;
}
}

// Restore original thread name.
Thread.currentThread().setName(threadName);
}
}

Expand Down