Support maxSubqueryBytes for window functions#16800
Support maxSubqueryBytes for window functions#16800LakshSingla merged 13 commits intoapache:masterfrom
Conversation
| .put(QueryContexts.MAX_SUBQUERY_BYTES_KEY, "100000") | ||
| .put(QueryContexts.MAX_SUBQUERY_ROWS_KEY, "0") |
There was a problem hiding this comment.
why the need to set MAX_SUBQUERY_ROWS_KEY to 0 ?
we have default as 100k and @Min(1) in ServerConfig defaults
so without setting it like this its not even possible to set it to 0
interestingly the way this value is read from the context could even read negative values...
another interesting thing is that only <0 is considered as unset in ClientQuerySegmentWrangler
There was a problem hiding this comment.
so without setting it like this its not even possible to set it to 0
With maxSubqueryBytes set, the queries can default to row-based limiting. For example, here: https://github.com/apache/druid/pull/16800/files#diff-788b8a750f09134a406d02535f5d06b106ce89e3aece072902af3362b310ac83L3. Even though maxSubqueryBytes was set, the query passed even though window functions don't support byte-based limiting.
we have default as 100k and @min(1) in ServerConfig defaults
Server config defaults are for the whole server, therefore it isn't a good practice to set it to something negative which will be interpreted as unlimited. For individual queries, however, we can bypass it as needed.
There was a problem hiding this comment.
I fell like we are not talking about the same thing.
I find it odd to set it to 0 ; meanwhile thru ServerConfig.defaults its only allowed to be set to Min(1) - if 0 is a valid setting - the shouldn't it be Min(0) for the ServerConfig.defaults ?
There was a problem hiding this comment.
I see now - There isn't a check in the query context that disallows the maxSubqueryBytes to be set to 0 using the query context. If it is negative, the ClientQuerySegmentWalker throws an exception, else it doesn't. So 0 in the query context accidentally does what it means, even though the value is disallowed on a cluster level. Given that it has been like this for a long time, should we keep it as is for this PR (I think negative -> unset is also weird, but any change to the semantics here might break queries of the existing clusters).
| RowSignature rowSignature = resultArraySignature(query); | ||
| RowSignature modifiedRowSignature = useNestedForUnknownTypes | ||
| ? FrameWriterUtils.replaceUnknownTypesWithNestedColumns(rowSignature) | ||
| : rowSignature; |
There was a problem hiding this comment.
this method has result in its name; but it seems like the result*Signature needed some fixups/checks; isn't this modifiedRowSignature the right signature for the query ?
There was a problem hiding this comment.
We had a problem of unknown types in the subqueries which is largely resolved now. Theoretically, each primitive/array type can be serialized as JSON type. Therefore if somebody sets useNestedForUnknownColumnTypes, we replace columns without a signature with JSON type, so that they can be serialized as frames.
It is an undocumented parameter, and is only applicable for resultsAsFrames since that relies on types being present.
There was a problem hiding this comment.
Also, it was mostly experimental and for dev purpose only because it mangled the data types.
| { | ||
| RowSignature rowSignature = resultArraySignature(query); | ||
| RowSignature modifiedRowSignature = useNestedForUnknownTypes | ||
| ? FrameWriterUtils.replaceUnknownTypesWithNestedColumns(rowSignature) |
There was a problem hiding this comment.
I don't understand why this is need ; what it does...
I think it would be very usefull to explain in the function's apidoc why replaceUnknownTypesWithNestedColumns could be needed - and why that's beneficial
imply-cheddar
left a comment
There was a problem hiding this comment.
This code needs to be improved and use the actual tools and implementations that already exist instead of perpetuating poor-performing code patterns.
| @Override | ||
| public Optional<Sequence<FrameSignaturePair>> resultsAsFrames( | ||
| WindowOperatorQuery query, | ||
| Sequence<RowsAndColumns> resultSequence, | ||
| MemoryAllocatorFactory memoryAllocatorFactory, | ||
| boolean useNestedForUnknownTypes | ||
| ) | ||
| { |
There was a problem hiding this comment.
Generally speaking, this methods implementation is entirely incorrect. RACs already know how to turn themselves into frames, this should just be doing a map over the incoming RAC, making it into a FrameMaker and then making the frame with that. It's just a simple transform over the sequence.
There was a problem hiding this comment.
I agree that the code is trying to do things roundaboutly. I did try using FrameMaker, however, due to the RowsAndColumnsUnravelingQueryRunner, the sequence of RACs is a sequence of rows. Therefore it throws a ClassCastException:
class [Ljava.lang.Object; cannot be cast to class org.apache.druid.query.rowsandcols.RowsAndColumns
Setting unravel to false in the ClientQuerySegmentWalker when we know the results would be materialized as frames pose the same problem when trying to materialize the results as rows resultsAsArrays. The way I see it, the ClientQuerySegmentWalker should know if it's trying to materialize the results as arrays or frames, and depending on that set unravel to true/false.
This leaks the implementation of the window tool chest to the calling class (ClientQuerySegmentWalker) where it should have been abstracted out. Do let me know if that is fine, or if there's a better resolution.
FWIW here's the cleaned up implementation of the resultsAsFrames in case I misinterpreted your comment:
{
return Optional.of(
resultSequence.map(
rac -> {
FrameMaker frameMaker = FrameMaker.fromRAC(rac);
return new FrameSignaturePair(
frameMaker.toColumnBasedFrame(),
frameMaker.computeSignature()
);
}
)
);There was a problem hiding this comment.
The above is complicated by the fact that if we are not able to materialize the results as frames, we fall back and materialize them as arrays. So, if we have a "not-unraveled" result sequence because we were expecting the results as frames but had to fallback, then the resultAsArrays would also need to handle the case when it is "not-unraveled" or convert the Sequence<RowsAndColumns> back to Sequence<Object[]>, i.e. unravel it
There was a problem hiding this comment.
Okay, the overall problem here is the lifecycle of the resultsAsFrames method, it's disjointed from the overall lifecycle of the query itself and there is no way for the query to control it.
The method would have been better done by making it actually wrap the QueryRunner instead of be a transformation of a Sequence to a Sequence. In that case, the QueryRunner would be able to know that it already returns frames, include the context parameter to skip the processing and move on.
The other option which requires a lot less code change for now would be to add a context parameter that is like "subQuery": true indicating that the query is being executed as a subQuery instead of as the full query. I think that changing the method to wrap the QueryRunner is the "better" resolution, but I think that the addition of the "subQuery" context parameter is also a viable and meaningful solution for this.
Generally speaking, all of the logic that is implemented in the QuerySegmentWalker that's dealing with data ends up problematic as we flesh out more query capabilities. We generally need to move more and more of that into Query implementations instead of in the Walker.
There was a problem hiding this comment.
There's the same trouble with the suggested resolution: There are two ways that a subquery's results can be materialized:
a. resultsAsArrays/List<Object[]> - For limiting the results by row number
b. resultsAsFrames/List - For limiting the results by memory size
(a) isn't required functionally, since Frames also contain the knowledge of the number of rows, however, at the time 2nd was added, we needed a way to:
- Fallback if we encounter any difficulty while materializing the results as frames, allowing the user to transition
- Allow the tool chests to transition gradually to
resultsAsFrames- Since it is an extension point and not all tool chests would have implemented the functionality to materialize the results as frames.
subquery: true doesn't contain all the information, if it needs to materialize the results as arrays or frames. For the former, the code is fine as is, however, with the latter we don't need to unravel.
I think one more mistake that the subquery materialization code made is overreliance on the generic type, which doesn't work here.
There was a problem hiding this comment.
You sparked another thought, it could be "serialization": "frame" on the context in order to request that things are done as a frame. That would also work.
For the transition, the context flag about num bytes versus num rows is what determines which thing does what, so there's a thing that already describes how to do the transition. If we just blindly try one, fail and then do the other, that will show up to users as a performance hit because they have no clue that there's this rando intermediate logic that is failing for a reason. They will never inform us of the problem and they will just believe that the query is slow when one of the causes could be that we are just doing the same work twice. So, we should pick one strategy or the other and work with it.
In terms of allowing Toolchests to "opt-in", if the QueryRunner method returned null then it would mean that the toolchest doesn't support it, so the only option would be to use the array-oriented method, which would be a fine fallback. For the subquery context flag, what you need is the resultsAsArrays method to be able to handle both the Frame case and the non-frame case.
In general, the Query-processing interfaces (Toolchest and RunnerFactory) should really be defined in terms of QueryRunners, the problem is that the methods are defined in terms of Sequences, which exist after the query is executed. This means that the methods have no way of communicating with the query processing in order to optimize/push down their logic, which is the fundamental problem that you are running into: you are needing to communicate a decision to the query processing logic and have no method of doing that.
There was a problem hiding this comment.
the context flag about num bytes versus num rows is what determines which thing does what, so there's a thing that already describes how to do the transition
I thought about this, however, we can also have a cluster-level config that determines the limit, so we should be looking at that as well in the window tool chest, which seems uncool that the window tool chest has to determine what to do.
If we just blindly try one, fail and then do the other, that will show up to users as a performance hit because they have no clue that there's this rando intermediate logic that is failing for a reason
Fallback is mostly for when the types aren't known. I agree that it is a performance hit, but at the time this feature was added, the signature informed by the tool chest didn't need to have a type. Scan queries only had knowledge of the column names (and not types), group by/time series... etc. toolchests could return null for the aggregator's dimensions. The fallback was present for these cases, where it's easy to detect the failure relatively early in the whole subquery processing flow. Fallback meant that transitioning from row -> byte based limit was simple. There's an undocumented parameter that treated these null types as JSON types, but that had logical flaws of its own iirc.
Removing the fallback would make the change much easier and I have a lot more confidence that the query doesn't need to fallback (and we have the known cases before hand), however, I'd still like to keep it just in case for a while. I have an idea, and it depends on the fact that RACs can convert itself to frames properly, and window toolchests would never fall back.
Thanks for the help!! I can work with the "serialization": "frame" parameter as a workaround to the current design choices.
There was a problem hiding this comment.
I thought about this, however, we can also have a cluster-level config that determines the limit, so we should be looking at that as well in the window tool chest, which seems uncool that the window tool chest has to determine what to do.
If you have a context parameter, you automatically have a cluster-level config as well. You can set the default context from cluster-level stuff. If we are creating a context parameter, we shouldn't have yet-another random config to set things, that just makes it so much harder to figure out what is set how.
Additionally, there's nothing wrong with the window query tool chest being asked what to do, that's good. The queries should be more involved in things because that allows them to optimize stuff based on what they know they need to do. To wit, the whole reason you are having problems with this PR is because there was an attempt to take the decision away from the query. If the code had been written in a way that gave the query control from the first place, you wouldn't have this problem. If you are worried about "but then everyone has to implement the same thing", if it is truly the exact same implementation for every, single, call site, then it's incredibly easy to just have a static helper that all of the call sites can use. Yes, each place has to call the helper, but the amount of time it takes to make a few places call a helper is significantly less than the amount of time it takes to work around an abstraction that isn't in the right place.
There was a problem hiding this comment.
I have added a context parameter (ea1d57e#diff-a31fa1872294019b29fc9940740cd66774a6af7430f8b7f3d4bc95ce3b5d6380R28) and I wanted a buy-in about its usefulness.
Either
a. "serialization: frames/rows"
or b. "subquery":true, along with maxSubqueryBytes/maxSubqueryRows
would be enough to disambiguate between the two ways in which the window toolchest can return the results.
We'd definitely need to add a flag, and the first one seems self-sufficient.
| case FRAMES: | ||
| return baseRunner.run(queryPlus, responseContext); | ||
| case ROWS: | ||
| return new RowsAndColumnsUnravelingQueryRunner(baseRunner).run(queryPlus, responseContext); |
There was a problem hiding this comment.
You are building a QueryRunner inside the run() method of a QueryRunner, don't do that. Instead the QueryRunner should be able to do the right thing based on the query that it got. In this case, you likely need to rename the RowsAndColumnsUnravelingQueryRunner to RowsAndColumnsSerializationQueryRunner and then make that query runner look at the context and do the right thing.
Additionally, when you look at this code, you will see that it's weird that, when it is asked for Frames, it returns a Sequence of RowsAndColumns. I understand that this is because there is some other code that is going to try to handle this, but as we have learned, that method is problematic in and of itself already. We should really move to a world where that method isn't necessary and we can do that by building the frame up here.
So, I would suggest embedding the frame-building code here and keep the other method as, effectively a noop (the identity function)
| final Function<Query<T>, QueryRunner<T>> queryRunnerSupplier; | ||
|
|
||
| if (dryRun) { | ||
| queryResults = Sequences.empty(); | ||
| queryRunnerSupplier = query -> (queryPlus, responseContext) -> Sequences.empty(); | ||
| } else { | ||
| final QueryRunner subqueryRunner = subQuery.getRunner(this); | ||
| queryResults = subqueryRunner.run( | ||
| QueryPlus.wrap(subQuery), | ||
| DirectDruidClient.makeResponseContextForQuery() | ||
| ); | ||
| queryRunnerSupplier = query -> query.getRunner(this); |
There was a problem hiding this comment.
I'm not sure you need this to be a Function<>. I think it can just be QueryRunner<T>. It should be safe to get the QueryRunner here and then just pass in the query with the context when you actually run it.
That, or you can make the decision on what serialization format you are going to ask for ahead of this and add it to the context first. Then you probably don't even have to change any of the implementation below.
There was a problem hiding this comment.
The reason for it being a function was that there's a QuerySwappingRunner which ensures that the query with which the runner was created is the same as the query with which it is being run and modifying the context after creating the runner throws with exception.
The latter suggestion works, and I have modified the code to that effect.
| .put(QueryContexts.MAX_SUBQUERY_BYTES_KEY, "100000") | ||
| .put(QueryContexts.MAX_SUBQUERY_ROWS_KEY, "0") |
There was a problem hiding this comment.
I fell like we are not talking about the same thing.
I find it odd to set it to 0 ; meanwhile thru ServerConfig.defaults its only allowed to be set to Min(1) - if 0 is a valid setting - the shouldn't it be Min(0) for the ServerConfig.defaults ?
| int index = 0; | ||
| for (String columnName : rowSignature.getColumnNames()) { | ||
| final Column column = rac.findColumn(columnName); | ||
| if (column == null) { |
There was a problem hiding this comment.
I feel like this should not happen at all - could it be an exception instead?
There was a problem hiding this comment.
Druid has a lot of places where the absence of a column is treated as all nulls, so leaving it as is is a less risky proposition. Can this happen if one of the segments doesn't contain a row, while the others do?
That said, this portion has been refactored as-is from the pre-existing code. I'll leave it to you to decide if this is applicable or not.
There was a problem hiding this comment.
around here it seems like we already had that in the signature; but if the rac doesn't know about that column I think its already inconsistent for some reason; it might be due to some other issue...
That said, this portion has been refactored as-is from the pre-existing code. I'll leave it to you to decide if this is applicable or not.
if we could add an exception and all tests pass I think we are better of with that 🤞
| FrameMaker frameMaker = FrameMaker.fromRAC(rac); | ||
| return new FrameSignaturePair( | ||
| frameMaker.toColumnBasedFrame(), | ||
| frameMaker.computeSignature() | ||
| ); |
There was a problem hiding this comment.
just a note
I feel like this part might not perform that well if it processes very small racs; like 1-2 rows
for example the toColumnBasedFrame allocates a 200MB area for producing the frame...
we could possibly improve on this later by adding an operator which could compact smaller racs into bigger ones or something.
There was a problem hiding this comment.
Yeah, we could have a recombining operator at the end that basically exists to recombine things based on whatever input boundaries would have existed. We should be able to make this "magically" happen, if it's not already then it's just a question of what the stacktrace looks like and which concrete RAC classes are involved.
| resultSequence.set(results); | ||
| return Optional.empty(); |
There was a problem hiding this comment.
why do we want to fallback - instead of telling the user that this have failed; and he might want to try the row based limiting?
I also feel like catching Exception is a bit broad; as it could catch NullPointerException or AIOOBE and such as well
There was a problem hiding this comment.
as it could catch NullPointerException or AIOOBE and such as well
That won't happen. This is because startedAccumulating indicates that the query processing has began. NullPtrException and AIOOBE will only happen if we have started processing the subquery results, which would mean that startedAccumulating = true. And if that's the case, for any random Exception, we rethrow it.
There was a problem hiding this comment.
- that boolean is not
truebut will be catched in this block - which also contains
toolChest#resultsAsFrameswhich has a few implementations - any of which could throw anExceptionand will be routed to a retry. - isn't a possible problematic case would be if the signature contains a column starting with
___druid; which produces anIAE?TimeseriesQQTC#resultsAsFrames->FrameWriters.makeColumnBasedFrameWriterFactory->disallowedFieldNames.isEmpty()->IAE?
could you please answer my original question:
why do we want to fallback - instead of telling the user that this have failed; and he might want to try the row based limiting?
There was a problem hiding this comment.
why do we want to fallback - instead of telling the user that this have failed; and he might want to try the row based limiting?
Copied from one of my responses:
Fallback is mostly for when the types aren't known. At the time this feature was added, the signature informed by the tool chest didn't need to have a type. Scan queries only had knowledge of the column names (and not types), group by/time series... etc. toolchests could return null for the aggregator's dimensions. The fallback was present for these cases, where it's easy to detect the failure relatively early in the whole subquery processing flow. Fallback meant that transitioning from row -> byte based limit was simple. There's an undocumented parameter that treated these null types as JSON types, but that had logical flaws of its own iirc.
There was a problem hiding this comment.
your answer suggests that the situation have changed - couldn't it be changed to fallback early and don't even try?
There was a problem hiding this comment.
your answer suggests that the situation have changed
Yes, now the fallback cases are mostly known and preventable.
couldn't it be changed to fallback early and don't even try?
I think it would require a change in the tool chests since there isn't a method currently to get the row signature while specifying the finalization. Well, essentially resultArraySignature would deem the aggregator's signature as finalization = UNKNOWN even though we want the finalization = YES.
|
performance have slightly improved there are some failing tests - the other tests have failed in the |
| if (expectedCell instanceof Object[]) { | ||
| expectedCellCasted = (Object[]) expectedCell; | ||
| } else { | ||
| expectedCellCasted = ExprEval.coerceListToArray((List) resultCell, true).rhs; | ||
| } |
There was a problem hiding this comment.
why repeat this part twice? if it can be Object[] or a List can't it be some other type?
if that's okay - why isn't this part of the coerceListToArray method?
There was a problem hiding this comment.
if it can be Object[] or a List can't it be some other type
It's expected to be an Object[]. I have added the coercion since the selectors aren't implemented uniformly and some older written expressions can return lists. It can be other types (like Long[]) but I don't think its necessary to cover those cases unless encountered since that isn't the desired behavior.
| (Double) resultCell, | ||
| eps | ||
| ); | ||
| } else if (expectedCell instanceof Object[] || expectedCell instanceof List) { |
There was a problem hiding this comment.
what's the difference between this block and the above starting in line 995?
There was a problem hiding this comment.
They are the same, I will refactor it to a private method.
kgyrtkirk
left a comment
There was a problem hiding this comment.
thank you for the updates and improvementes @LakshSingla !
+1
Window queries now acknowledge maxSubqueryBytes.
Window queries now acknowledge maxSubqueryBytes.
Description
This PR adds implementation for
resultsAsFramesto the window tool chest. This will allow the subqueries involving window functions to be limited bymaxSubqueryBytes, which is a better alternative to themaxSubqueryRowsfor limiting the heap usage of a query.Release note
Window queries now acknowledge
maxSubqueryBytes.This PR has: