Fix join filter rewrites with nested queries#9978
Fix join filter rewrites with nested queries#9978jon-wei wants to merge 9 commits intoapache:masterfrom
Conversation
|
Marking WIP, need to adjust javadocs, will also see test coverage results |
|
Added javadocs and fixed some inspections, removing WIP |
suneet-s
left a comment
There was a problem hiding this comment.
I haven't looked through any of the tests yet. Since there's a lot of tradeoffs to be made, I think we should have a short-circuit in the JoinFilterPreAnalysisGroup that falls back to the old behavior which is disabled by default.
This gives Druid operators a way to go back to the ~ 0.18.1 behavior in case there are any un-foreseen issues with the pre analysis filter caching and they are confident their queries will not run into the issue described in #9792
| return withOverriddenContext(ImmutableMap.of(QueryContexts.LANE_KEY, lane)); | ||
| } | ||
|
|
||
| default VirtualColumns getVirtualColumns() |
There was a problem hiding this comment.
I'm assuming this change has nothing to do with the bug fix correct?
There was a problem hiding this comment.
It was something failing inspections since all the implementations of Query implement that method, it's only needed for the old rewrite mode (so I added this back in)
| "Filter provided to cursor [%s] does not match join pre-analysis filter [%s]", | ||
| JoinFilterPreAnalysis jfpa; | ||
| if (filter == null) { | ||
| jfpa = JoinFilterAnalyzer.computeJoinFilterPreAnalysis( |
There was a problem hiding this comment.
Does this mean we'll re-compute the pre-analysis every time if the filter is null?
I think this code path is short-circuited right now, but it could change in the future and it would be hard to remember that the computation is not cached.
There was a problem hiding this comment.
I adjusted the analysis group so the key is now (Filter, JoinableClauses, VirtualColumns) instead of just Filter, I think that's a more correct key, and the null filters are cached now.
| joinFilterPreAnalysisGroup.isEnableRewriteValueColumnFilters(), | ||
| joinFilterPreAnalysisGroup.getFilterRewriteMaxSize() | ||
| ); | ||
| } |
There was a problem hiding this comment.
I think a better abstraction is to hide all of this logic in the JoinFilterPreAnalysisGroup. We shouldn't expose getAnalyses() to the other classes.
Instead, consider exposing a function like getPreAnalysisForFilter(Filter f) - that provides the jfpa Users of the API don't need to worry about how it works as long as we guarantee that this function is thread safe.
There was a problem hiding this comment.
Also, since the pre-analysis computation is expensive, the way this is written, I think it's still possible for all the threads on the historical to attempt to compute the preAnalysis, so we could see a spike in CPU usage.
There was a problem hiding this comment.
Also, since the pre-analysis computation is expensive, the way this is written, I think it's still possible for all the threads on the historical to attempt to compute the preAnalysis, so we could see a spike in CPU usage.
Sorry I misread the javadocs. ConcurrentHashMap blocks on an operation on the same key when a computation is on-going, so this shouldn't be an issue.
From the javadocs
The entire method invocation is performed atomically, so the function is
applied at most once per key. Some attempted update operations
on this map by other threads may be blocked while computation
is in progress, so the computation should be short and simple,
and must not attempt to update any other mappings of this map.
There was a problem hiding this comment.
I've added a computeJoinFilterPreAnalysisIfAbsent and getAnalysis method to JoinFilterPreAnalysisGroup
| private final JoinFilterCorrelations correlations; | ||
| private final boolean enableFilterPushDown; | ||
| private final boolean enableFilterRewrite; | ||
| private final JoinFilterPreAnalysisGroup myGroup; |
There was a problem hiding this comment.
Does this introduce a memory leak / a circular dependency of some kind?
The group holds a reference to the pre-analysis object via the analyses concurrent map and the pre-analyses object holds a reference back to the group object via myGroup
There was a problem hiding this comment.
Did some reading... Java's GC is smart enough to handle circular dependencies, so I think this should be ok.
There was a problem hiding this comment.
I think it's fine, but I restructured this to store all the config parameters in a new JoinFilterRewriteConfig class, so there's no more reference to the group in the pre-analysis objects.
| private final boolean enableFilterRewrite; | ||
| private final boolean enableRewriteValueColumnFilters; | ||
| private final long filterRewriteMaxSize; | ||
| private final ConcurrentHashMap<Filter, JoinFilterPreAnalysis> analyses; |
There was a problem hiding this comment.
Have you looked at the implementations of hashCode and equals for all filters? Are they efficient? What happens if the filter is something like an IN filter with a very large list of values? The hashCode check could be slower than deciding that the filter can not be pushed down.
There was a problem hiding this comment.
The main one I would be concerned with is InFilter, and to a lesser extent AND/OR.
I added lazy computation/caching for InFilter/AndFilter/OrFilter hashcodes, as long as the hashing is done once per query, and not once per segment, I wouldn't expect hashing overhead to outweigh the benefits of the rewrite (esp since such large filters would be expensive to apply per-row on the RHS)
I added a query context option and a separate set of methods for the old rewrite mode to JoinFilterPreAnalysisGroup, described as something available temporarily until the new mode is more battle-tested. |
| @@ -11486,79 +11485,75 @@ public void testTimeExtractWithTooFewArguments() throws Exception | |||
| @Parameters(source = QueryContextForJoinProvider.class) | |||
| public void testNestedGroupByOnInlineDataSourceWithFilterIsNotSupported(Map<String, Object> queryContext) throws Exception | |||
There was a problem hiding this comment.
heh, this method should probably renamed to ...IsSupported now i guess?
| public int hashCode() | ||
| { | ||
| return fields != null ? fields.hashCode() : 0; | ||
| if (fieldsHashCode == null) { |
There was a problem hiding this comment.
does this need to be threadsafe? Might want to put behind a Supplier.memoize instead?
There was a problem hiding this comment.
I changed this and elsewhere to use Supplier.memoize
| return subtotalsSpec; | ||
| } | ||
|
|
||
| @Override |
There was a problem hiding this comment.
Why this change, Query.java provides a default implementation, so doesn't it @Override?
There was a problem hiding this comment.
This was a change from removing the "old rewrite mode" initially, but now that it's added back in, so I put the overrides back
| * is kept temporarily available in case issues arise with the new mode, and the user does not run queries with the | ||
| * affected nested shape. | ||
| */ | ||
| public static <T> boolean getUseJoinFilterRewriteOldRewriteMode(Query<T> query) |
There was a problem hiding this comment.
is it possible to detect this and use the different modes automatically since it sounds like the old mode is perhaps better if there are no subqueries involved?
There was a problem hiding this comment.
Hmm, I think that could be worth looking into later on
| { | ||
| final List<String> sortedValues = new ArrayList<>(values); | ||
| sortedValues.sort(Comparator.nullsFirst(Ordering.natural())); | ||
| final Hasher hasher = Hashing.sha256().newHasher(); |
There was a problem hiding this comment.
I know this isn't new, but might be worth investigating if there are faster hashes that are unique enough for this (not necessary in this PR, just thinking out loud)
| // to ensure that the hashCode is only computed once per Filter since the Filter interface is not thread-safe. | ||
| synchronized (analyses) { | ||
| if (filter != null) { | ||
| filter.hashCode(); |
There was a problem hiding this comment.
Oh, is this why we don't need thread-safety on filter hashcode methods i guess? This seems kind of a funny way to prime them with the cached values, I think maybe the supplier.memoize pattern would be a little cleaner and make this not necessary?
There was a problem hiding this comment.
Removed this after changing to supplier.memoize
| true, | ||
| QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE | ||
| ); | ||
| JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup = makeDefaultConfigPreAnalysisGroup(); |
| } | ||
| AndFilter andFilter = (AndFilter) o; | ||
| return Objects.equals(getFilters(), andFilter.getFilters()); | ||
| return Objects.equals(hashCode(), andFilter.hashCode()); |
There was a problem hiding this comment.
equals should not compare hashCodes. It's possible that different and filters hash to the same hashCode.
Similar comments in the other filters
|
I'm going to close this one and open a new PR with a different approach, this one has conflicts as well. |
|
Opened a new PR: #10015 |
Fixes #9792 by moving the join filter pre-analysis into the
makeCursorsmethod ofHashJoinSegmentStorageAdapter. This is done by introducing a newJoinFilterPreAnalysisGroupclass, which holds a concurrent hash map of Filter -> JoinFilterPreAnalysis, used to avoid redundant computation of the JoinFilterPreAnalysisThis PR has: