Add TaskResourceCleaner; fix a couple of concurrency bugs in batch tasks#8236
Add TaskResourceCleaner; fix a couple of concurrency bugs in batch tasks#8236jon-wei merged 17 commits intoapache:masterfrom
Conversation
| final Firehose firehose = firehoseFactory.connect(dataSchema.getParser(), firehoseTempDir) | ||
| ) { | ||
| this.appenderator = appenderator; | ||
| registerResourceCloserOnAbnormalExit(config -> appenderator.closeNow()); |
There was a problem hiding this comment.
hm, I just saw that Appenderator has the following javadoc:
Concurrency: all methods defined in this class directly, including {@link #close()} and {@link #closeNow()}, i. e.
* all methods of the data appending and indexing lifecycle except {@link #drop} must be called from a single thread.
Maybe we should move the appenderator.closeNow() calls to an interrupt handler block in the main runTask methods and let interrupts through the resource closer trigger that.
Some of the appenderators are created in try-with, but AppenderatorImpl close() checks if it's already closed, so calling close() after closeNow() should be fine.
There was a problem hiding this comment.
Good point. Will update the pr soon.
There was a problem hiding this comment.
Took Closeable out of Appenderator. Callers of Appenderator should explicitly call close() or closeNow().
| private List<IndexTask> indexTaskSpecs; | ||
|
|
||
| @Nullable | ||
| private volatile IndexTask currentRunningTaskSpec = null; |
There was a problem hiding this comment.
There was a problem hiding this comment.
The added Javadoc doesn't answer the question "Why the semantics of volatile field reads and writes (as defined in the Java Memory Model) are required for the field?" from the referenced checklist item.
It's actually likely that just adding volatile to the field is either not enough to achieve certain concurrency goal (which is also not clarified in the Javadoc), or not needed.
"by an HTTP thread when {@link #stopGracefully} is called." - please point to a specific class and method where does this happen, because it's not obvious.
There was a problem hiding this comment.
The added Javadoc doesn't answer the question "Why the semantics of volatile field reads and writes (as defined in the Java Memory Model) are required for the field?" from the referenced checklist item.
I don't understand your comment. Do you want me to use the terms defined in the JMM?
It's actually likely that just adding volatile to the field is either not enough to achieve certain concurrency goal (which is also not clarified in the Javadoc), or not needed.
I don't think this kind of comment helps. So do you think volatile here is necessary or not?
There was a problem hiding this comment.
Do you want me to use the terms defined in the JMM?
Yes, I think the term "happens-before" should usually appear in this type of explanatory comments.
So do you think volatile here is necessary or not?
I don't know because I wasn't able to figure out the place you referred to in "by an HTTP thread when {@link #stopGracefully} is called."
There was a problem hiding this comment.
So if stopGracefully() may be called at any time concurrently with run(), I think the concurrency property that is worth ensuring is not starting running the next task if stopGracefully() is already stopping the previous task. To achieve this, volatile is not enough. This property could be achieved the following way:
currentRunningTaskSpecisAtomicReference<Object>- The stopping callback:
Object currentRunningTask = currentRunningTaskSpec.getAndSet(SPECIAL_VALUE_STOPPED);
if (currentRunningTask != null) {
((IndexTask) currentRunningTask).stopGracefull(config);
}- The code in the loop in
runTask()does something like
Object prevSpec = currentRunningTaskSpec.get();
if (prevSpec == SPECIAL_VALUE_STOPPED ||
!currentRunningTaskSpec.compareAndSet(prevSpec, eachSpec)) {
log.info("Stopped concurrently");
...
}There was a problem hiding this comment.
That construction definitely looks better!
There was a problem hiding this comment.
Side note, I don't think this applies here, but in general do you think it's a valid explanation to say: "this field is accessed by multiple threads, and I haven't conclusively proven that volatile is not necessary, so I've included it"?
Probably a lot of the volatiles in the codebase today are like this. I don't necessarily see a big problem with it. Even though some of them could probably be safely removed, the 'just-in-case' volatiles still help lower the cognitive overhead of documenting and dealing with the codebase. (Otherwise, you'd need to think through these arguments again every time you make a change.)
To me, the main problem is that volatile is often used to delude oneself or reviewers that some code is concurrently safe without forcing through the threading/concurrent control flow models of the code, the desired properties of the code, and the proof that the code has the desired properties. If "volatile = probably worse perf, but no concurrency defects" was the case, that wouldn't be too bad, indeed. Unfortunately, in reality, what we have is "volatile = high probability that there are some concurrency defects around the code".
This is why https://github.com/code-review-checklists/java-concurrency#justify-volatile demands to justify volatile in JMM terms rather than in vague phrases.
There was a problem hiding this comment.
Thanks, that's a good point. Fixed as suggested.
There was a problem hiding this comment.
Unfortunately, in reality, what we have is "volatile = high probability that there are some concurrency defects around the code".
Interesting point. I wonder if we can nail down what usages of volatile are especially likely to be buggy. My intuitions would be:
- Not very risky: 'one-way' volatiles that are used to ensure safe publishing out of an 'owner' thread, but where the 'non-owner' threads aren't intended to call any mutation methods on the published object. This kind of pattern is sometimes used for publishing objects that monitoring or status-checking code will periodically inspect.
- Not very risky: volatiles used for deferred initialization (updated once from null -> nonnull, but not updated ever again).
- More risky: designs where a reference is updated multiple times by an 'owner' thread, and 'non-owner' threads may call some mutation methods on the object.
- Even more risky: designs where there is no clear 'owner' thread.
(The original example of currentRunningTaskSpec would have seemed risky by the above intuitions: there is a clear owner thread, but the reference is updated more than once, and non-owners are going to be calling mutation methods)
| * | ||
| * @param taskConfig TaskConfig for this task | ||
| * | ||
| * @see org.apache.druid.indexing.worker.http.WorkerResource#doShutdown(String) |
There was a problem hiding this comment.
Could mention that this shutdown APi will cause stopGracefully to be called either through lifecycle stop (ForkingTaskRunner on MM -> process shutdown -> SingleTaskBackgroundRunner on Peon) or directly from an HTTP thread (ThreadingTaskRunner for Indexer)
There was a problem hiding this comment.
Added some description about it.
| @@ -162,15 +162,21 @@ default int getPriority() | |||
|
|
|||
There was a problem hiding this comment.
Could you please add an overview section to Task's class-level Javadoc, describing which methods of this interface could be called concurrently with which other methods and which could not? In other words, provide an overview concurrent access documentation for this interface.
| * Its implementations should handle potential concurreny issues properly. | ||
| * terminated with extreme prejudice. | ||
| * | ||
| * This method can be called at any time while {@link #run} is being called when the task is killed. |
There was a problem hiding this comment.
Could you specify what should or should not happen if stopGracefull() is called concurrently (e. g. "before") run()? Does it guarantee that the task won't even start?
There was a problem hiding this comment.
Good point. I added the below to javadoc.
This method can be called at any time while run() is being called when the task is killed. If this task is not started yet, that is run() is not called yet, this method will be never called. Once this task is started, this method can be called even after run() returns. Implementations of this method may want to avoid unnecessary work if run() already returned.
| * is not started yet, that is {@link #run} is not called yet, this method will be never called. | ||
| * Once this task is started, this method can be called even after {@link #run} returns. Implementations of this | ||
| * method may want to avoid unnecessary work if {@link #run} already returned. | ||
| * Depending on the task executor type, one of the two cases below can happen when the task is killed. |
There was a problem hiding this comment.
It would be clearer if there was an empty line above this line and no empty line below this line.
| * | ||
| * This method can be called at any time while {@link #run} is being called when the task is killed. | ||
| * This method can be called at any time while {@link #run} is being called when the task is killed. If this task | ||
| * is not started yet, that is {@link #run} is not called yet, this method will be never called. |
There was a problem hiding this comment.
This text sort of doesn't make sense, because the words like "yet", "before", "after" are not definable in terms of JMM which doesn't deal with time. Yet, in practical terms, currently, stopGracefully() can be called "before" run() in CompactionTask on one of the eachSpecs. So this contract is violated.
There was a problem hiding this comment.
Good point. I think the contract now should say "this method can be called at any time no matter when run() is executed".
This text sort of doesn't make sense, because the words like "yet", "before", "after" are not definable in terms of JMM which doesn't deal with time.
I don't know what it should be to address your comment. What is your suggestion?
There was a problem hiding this comment.
I think the contract now should say "this method can be called at any time no matter when run() is executed".
I agree.
I don't know what it should be to address your comment. What is your suggestion?
I would just put the phrase "Regardless when stopGracefully() is called w. r. t. run(), the implementation must not allow a resource leak or lingering executions (local or remote)."
There was a problem hiding this comment.
Missed the first sentence: "this method can be called at any time no matter when run() is executed".
Other than that, I don't have more comments about this PR. I didn't review it in full but don't block it.
jon-wei
left a comment
There was a problem hiding this comment.
LGTM (one more minor comment)
| } | ||
|
|
||
| /** | ||
| * Run this task. Before running the task, ithecks the the current task is already stopped and |
Description
There are a couple of bugs in
CompactionTaskandParallelIndexSubTask. InCompactionTask, the current running indexTask should be killed when the compactionTask is killed. InParallelIndexSubTask, I think this line should be in the synchronized block.To reduce both of the possibility of mistakes and duplicate codes, I added a couple of helper methods to
AbstractBatchIndexTask. I also removed the default implementation ofstopGracefully()because it's now should be called properly for all production tasks.This PR has: