Non querying tasks shouldn't use processing buffers / merge buffers#16887
Non querying tasks shouldn't use processing buffers / merge buffers#16887LakshSingla merged 10 commits intoapache:masterfrom
Conversation
| command.addSystemProperty("druid.indexer.task.tmpStorageBytesPerTask", storageSlot.getNumBytes()); | ||
|
|
||
| if (!task.supportsQueries()) { | ||
| // Processing threads, processing buffers and merging buffers are not required on tasks which |
There was a problem hiding this comment.
Nit: could you add the same comment in k8 task adapter as well.
|
Is it possible to do the logic in the peon itself rather than in the runners? i.e., if a peon is launched with a task that doesn't support queries, it doesn't create a merge pool or processing pool? That way, each way of launching a peon wouldn't need to be aware of this. |
I looked at the following approaches but didn't find a suitable one:
LMK if there's a way that I am missing. Otherwise, there's some duplication in the |
|
@LakshSingla , while I completely agree with not installing the |
| { | ||
| return ImmutableList.of( | ||
| new DruidProcessingModule(), | ||
| Modules.override(new DruidProcessingModule()).with( |
There was a problem hiding this comment.
Rather than doing Modules.override(), another option could be to write up a class TaskQueryProcessingModule extends DruidProcessingModule (or even just inline it here), where you could just call the super implementation, thus avoiding code duplication.
There was a problem hiding this comment.
Thanks for the idea!
| if (!task.supportsQueries()) { | ||
| return new ForwardingQueryProcessingPool(Execs.dummy()); | ||
| } | ||
| return new MetricsEmittingQueryProcessingPool( | ||
| PrioritizedExecutorService.create( | ||
| lifecycle, | ||
| config | ||
| ), | ||
| executorServiceMonitor | ||
| ); |
There was a problem hiding this comment.
Do not use a ForwardingQueryProcessingPool since it is not meant to be used anyway.
| if (!task.supportsQueries()) { | |
| return new ForwardingQueryProcessingPool(Execs.dummy()); | |
| } | |
| return new MetricsEmittingQueryProcessingPool( | |
| PrioritizedExecutorService.create( | |
| lifecycle, | |
| config | |
| ), | |
| executorServiceMonitor | |
| ); | |
| if (task.supportsQueries()) { | |
| return super.getProcessingPoolExecutor(args); | |
| } else { | |
| // I wonder if we shouldn't just throw an exception or return null here | |
| return DirectQueryProcessingPool.INSTANCE; | |
| } |
A similar simplification can be done for other methods too.
There was a problem hiding this comment.
null would look better than the direct processing pool since using the DirectQueryProcessingPool.INSTANCE looks wrong. IMO it means to do everything in the calling thread which isn't the expected behaviour. Also, I'll test if throwing an exception works, but I think that would cause guice initialization error.
since it is not meant to be used anyway
I didn't understand this part. Why should we not be using the ForwardingQueryProcessingPool. The benefit of my approach would be that the calling code wouldn't need to assume that the processing pool can be null anywhere, and handle that case separately. Moreover, it also acts as a safeguard in case any non querying task tries to submit a task to the pool, instead of complacently executing the task in the same thread (as with the direct processing pool).
There was a problem hiding this comment.
I didn't understand this part. Why should we not be using the ForwardingQueryProcessingPool. The benefit of my approach would be that the calling code wouldn't need to assume that the processing pool can be null anywhere, and handle that case separately. Moreover, it also acts as a safeguard in case any non querying task tries to submit a task to the pool, instead of complacently executing the task in the same thread (as with the direct processing pool).
I meant that if we know upfront that this task is not meant to use the query processing pool, then we should never return an instance that can be used at all, even if it causes the task to fail (since it was doing something illegal anyway).
I agree with your point about null.
How about we add a NoopQueryProcessingPool that throws Unsupported exception when anything is submitted to it?
There was a problem hiding this comment.
Also, I wish the QueryProcessingPool didn't extend ListeningExecutorService.
It would make for a cleaner interface and it would have been much easier to write dummy implementations.
Are the executor service methods ever called on the query processing pool?
There was a problem hiding this comment.
This is one usage of the processing pool as an executor service. Its javadoc also mentions such usages
I think a cleaner design would have been to have a method getExecutor() in the QueryProcessingPool interface. But since this is an @ExtensionPoint, I suppose we should leave it as is for now.
There was a problem hiding this comment.
then we should never return an instance that can be used at all, even if it causes the task to fail
The ForwardingQueryProcessingPool(Execs.dummy()) would do exactly that unless I am mistaken. The task would be delegated to the dummy executor which throws UOE on any attempt to submit the task.
I attempted to create a NoopQueryProcessingPool while raising the PR, but it was doing the same thing. Maybe I can rename and make it clearer to read, or subclass the forwarding pool explicitly.
There was a problem hiding this comment.
The ForwardingQueryProcessingPool(Execs.dummy()) would do exactly that unless I am mistaken. The task would be delegated to the dummy executor which throws UOE on any attempt to submit the task.
While this is true, there are small differences in using a dedicated NoopQueryProcessingPool:
- The intent is clearer to someone reading the code. Using the
Noopimplementation implies that it is meant to do nothing. Using aForwardingpool with a dummy executor could mean that it is supposed to have partial functionality. - The error message (and perhaps the stack trace too) would be more user-friendly. When using
Nooppool, the exception is thrown by the processing pool itself rather than the underlying dummy executor service.
That said, this is not a blocker for this PR as it is a style choice really.
There are already some quirks of the QueryProcessingPool interface that could use some cleanup. We could address this then.
| new Module() | ||
| { | ||
| @Override | ||
| public void configure(Binder binder) |
There was a problem hiding this comment.
Need not override this method if extending DruidProcessingModule.
The pool is created lazily, which is when the various query toolchests/runners/engines are created. The allocation of the buffer can or cannot be lazy depending on the type of the pool.
I have verified the above by looking at one of the controller logs, which shouldn't be using the buffers. |
|
Thanks for the suggestion, that is much better than what I was trying to achieve with the latest commit. |
|
@kfaraz |
This reverts commit 83085d4.
|
Ah, thanks for the clarification, @LakshSingla . Nice of Guice to give clear error messages.
You are too quick to jump between commits 😛 . There are still other things that can be done, like: Although, I think out of these two, option (a) is better. For now, do you think the above suggestion seems viable? |
|
I think the first one seems neat. Lemme try it out. |
kfaraz
left a comment
There was a problem hiding this comment.
Thanks for incorporating the feedback, @LakshSingla ! Left some more suggestions.
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.Executors; | ||
|
|
||
| public class ProcessingModuleHelper |
There was a problem hiding this comment.
You need not add a new class for the static methods. I think it is cleaner to just keep these methods in DruidProcessingModule itself. It would help with the review as well.
There was a problem hiding this comment.
After this change, DruidProcessingModule is more like the Historical+Indexer processing module. The same method for caching etc is copied everywhere. I feel that its neater to have it in a separate method, so that the methods can be used by other processing modules as well.
| /** | ||
| * Implementation of {@link QueryProcessingPool} that throws when it is given any query execution task unit | ||
| */ | ||
| public class NoopQueryProcessingPool extends ForwardingQueryProcessingPool |
There was a problem hiding this comment.
If we are writing a Noop implementation, it should not extend the Forwarding pool, rather implement the QueryProcessingPool directly and throw unsupported or equivalent exception in all methods.
kfaraz
left a comment
There was a problem hiding this comment.
Minor comments, rest looks good.
| public class NoopQueryProcessingPool implements QueryProcessingPool | ||
| { | ||
| private static final QueryProcessingPool INSTANCE = new NoopQueryProcessingPool(); | ||
| private static final DruidException UNSUPPORTED_EXCEPTION = |
There was a problem hiding this comment.
I am not sure if keeping an exception constant is desirable. You can keep the exception message as a constant but throw a fresh exception wherever needed.
| private static final DruidException UNSUPPORTED_EXCEPTION = | ||
| DruidException.defensive("Unexpected call made to NoopQueryProcessingPool"); | ||
|
|
||
| public static QueryProcessingPool instance() |
There was a problem hiding this comment.
| public static QueryProcessingPool instance() | |
| public static NoopQueryProcessingPool instance() |
| */ | ||
| public class NoopQueryProcessingPool implements QueryProcessingPool | ||
| { | ||
| private static final QueryProcessingPool INSTANCE = new NoopQueryProcessingPool(); |
There was a problem hiding this comment.
| private static final QueryProcessingPool INSTANCE = new NoopQueryProcessingPool(); | |
| private static final NoopQueryProcessingPool INSTANCE = new NoopQueryProcessingPool(); |
| if (!task.supportsQueries()) { | ||
| return DummyNonBlockingPool.instance(); | ||
| } | ||
| return DruidProcessingModule.createIntermediateResultsPool(config); |
There was a problem hiding this comment.
Maybe invert the condition for readability:
| if (!task.supportsQueries()) { | |
| return DummyNonBlockingPool.instance(); | |
| } | |
| return DruidProcessingModule.createIntermediateResultsPool(config); | |
| if (task.supportsQueries()) { | |
| return DruidProcessingModule.createIntermediateResultsPool(config); | |
| } else { | |
| return DummyNonBlockingPool.instance(); | |
| } |
There was a problem hiding this comment.
Same comment in other methods.
kfaraz
left a comment
There was a problem hiding this comment.
Thanks for the changes, @LakshSingla !
|
Tests are failing due to insufficient coverage of the changes made to the processing module. |
Description
Tasks that do not support querying or query processing i.e.
supportsQueries = falsedo not require processing threads, processing buffers, and merge buffers.The following tasks don't support queries -
Release note
Reduce the direct memory requirement on the non query processing tasks by not reserving the query buffers for those.
This PR has: