Move Caching Cluster Client to java streams and allow parallel intermediate merges#5913
Move Caching Cluster Client to java streams and allow parallel intermediate merges#5913drcrallen wants to merge 71 commits intoapache:masterfrom
Conversation
This reverts commit f80a5bc.
|
nice PR description :) |
| ); | ||
| } | ||
|
|
||
| private static final AtomicLong fjpWorkerThreadCount = new AtomicLong(0L); |
There was a problem hiding this comment.
Using LongAdder will be better than AtomicLong. There is a source-level analysis in my personal blog, if you are interested, you can take a look.
Tips: https://yuzhouwan.com/posts/31915#LongAdder
// -Xmx512M -Xms512M -Xmn256M -XX:+AlwaysPreTouch -ea
@Test
public void pressureLongAdder() throws Exception {
final LongAdder longAdder = new LongAdder();
ExecutorService executorService = Executors.newCachedThreadPool();
long startTime = System.currentTimeMillis();
for (int i = 0; i < 100; i++) {
executorService.submit(new Thread(() -> {
for (int j = 0; j < 1000_0000; j++) {
longAdder.increment();
}
System.out.print(String.format("%s %s \t", Thread.currentThread().getId(), longAdder.longValue()));
/*
14 19607585 12 36445036 20 38985288 38 76821270 70 117094732 18 127252576
22 137043349 26 153411172 30 164051380 34 165971155 102 192241678 134 201104979
158 232657818 46 279030056 174 288502545 94 347965290 198 348060553 118 348087414
36 353092712 28 357762215 44 365464475 126 379518198 54 379623515 182 380077075
142 385263911 78 389013887 62 389085727 110 389122678 86 389920423 166 393535019
150 396382512 190 403100499 32 403161217 208 403197689 206 406065520 16 410725026
24 415347205 40 415379997 48 415733397 104 418507295 192 423244160 176 455793362
168 458311865 160 463028656 136 496375440 72 541243645 186 561877000 170 575352229
162 584152392 154 604552121 138 614092854 64 638151890 114 668705836 58 669235250
188 699213410 156 729222401 124 754336889 100 784326386 76 813479501 120 827569944
66 830236567 98 832153503 112 841408676 204 849520891 210 852391130 202 864804732
172 875603834 194 877222893 200 881090909 88 882809513 80 882846368 56 887174571
178 889682247 140 901357028 146 902169049 184 904540678 152 915608988 130 917896629
116 924616135 144 927674541 122 930399321 128 939791111 106 942656234 84 950848174
96 951904067 90 954910184 74 964338213 196 966487766 82 968307139 52 975854400
180 977385398 164 978882525 50 980896807 148 988292352 132 989090669 108 996891232
92 996921398 42 996938988 68 996953941 60 1000000000
*/
}));
}
executorService.shutdown();
while (!executorService.isTerminated()) {
Thread.sleep(1);
}
long endTime = System.currentTimeMillis();
System.out.println("\n" + (endTime - startTime)); // 3275 ms
}
// -Xmx512M -Xms512M -Xmn128M -XX:+AlwaysPreTouch -ea
@Test
public void pressureAtomicLong() throws Exception {
final AtomicLong atomicLong = new AtomicLong();
ExecutorService executorService = Executors.newCachedThreadPool();
long startTime = System.currentTimeMillis();
for (int i = 0; i < 100; i++) {
executorService.submit(new Thread(() -> {
for (int j = 0; j < 1000_0000; j++) {
atomicLong.getAndIncrement();
}
System.out.print(String.format("%s %s \t", Thread.currentThread().getId(), atomicLong.longValue()));
/*
12 390000000 28 390000000 44 390000000 20 390000000 26 390000000 18 390000000
80 390000000 56 390000000 96 390000000 24 390000000 88 390000000 72 390000000
22 390000000 118 390000000 54 390000000 142 390000000 70 390000000 86 390000000
182 390000000 110 390000000 62 390000000 78 390000000 102 390000000 158 390000000
150 390000000 46 390000000 38 390000000 126 390000000 94 390000000 134 390000000
14 390000000 48 390000000 40 390000000 32 390000000 34 390000000 64 390000000
42 390000000 36 390000000 16 390000000 180 416396554 204 419908287 196 425536497
92 732203658 30 733835560 202 733835559 210 733873571 146 733878564 186 733883527
170 733888686 76 733892691 84 733888815 148 733901560 162 733907032 172 733908079
52 733913280 116 733918421 124 733906868 164 733920945 132 733891348 68 733923672
108 733924928 156 733926091 60 733921998 140 733927257 188 733928891 154 733871822
194 733830477 178 733872527 100 733830322 106 748251688 144 1000000000 98 1000000000
58 1000000000 90 1000000000 130 1000000000 138 1000000000 114 1000000000 104 1000000000
168 1000000000 200 1000000000 184 1000000000 160 1000000000 174 1000000000 112 1000000000
190 1000000000 198 1000000000 82 1000000000 206 1000000000 166 1000000000 176 1000000000
136 1000000000 208 1000000000 74 1000000000 122 1000000000 152 1000000000 192 1000000000
120 1000000000 128 1000000000 66 1000000000 50 1000000000
*/
}));
}
executorService.shutdown();
while (!executorService.isTerminated()) {
Thread.sleep(1);
}
long endTime = System.currentTimeMillis();
System.out.println("\n" + (endTime - startTime)); // 19409 ms
}There was a problem hiding this comment.
Had to change back to AtomicLong because of a race condition and the long adder not quite being the right solution
| .dataSource(DATA_SOURCE) | ||
| .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(interval))) | ||
| .context(ImmutableMap.<String, Object>of("If-None-Match", "aVJV29CJY93rszVW/QBy0arWZo0=")) | ||
| .context(ImmutableMap.<String, Object>of( |
There was a problem hiding this comment.
ImmutableMap.<String, Object>of can be ImmutableMap.of
There was a problem hiding this comment.
that one was caught in a source code refactor I think :) I'll fix
|
@leventov I'm worried some of your comments might be on commits instead of the PR, and might be lost in the UI for me to discover. I think I responded to all the comments. Can you please call out in a fresh comment in this PR any that are missing responses so I can figure out how they were missed? |
| return runAndMergeWithTimelineChange( | ||
| query, | ||
| // No change, but Function.identity() doesn't work here for some reason | ||
| identity -> identity |
There was a problem hiding this comment.
UnaryOperator.identity() works
| return CachingClusteredClient.this.run(queryPlus, responseContext, timeline -> timeline); | ||
| } | ||
| }; | ||
| return runAndMergeWithTimelineChange( |
There was a problem hiding this comment.
ah I see now, nevermind, fixing
| */ | ||
| private <T> Sequence<T> run( | ||
| @VisibleForTesting | ||
| <T> Stream<Sequence<T>> run( |
There was a problem hiding this comment.
According to the call order, run() should be placed after runAndMergeWithTimelineChange()
| ); | ||
| return MergeWorkTask.parallelMerge( | ||
| sequences.parallel(), | ||
| sequenceStream -> |
There was a problem hiding this comment.
Please add type to this variable for readability
| queryRunnerFactory.mergeRunners( | ||
| mergeFjp, | ||
| sequenceStream.map( | ||
| s -> (QueryRunner<T>) (ignored0, ignored1) -> (Sequence<T>) s |
There was a problem hiding this comment.
Extracting a static factory method QueryRunner.returnConstant() would be more readable
There was a problem hiding this comment.
adding a simple function to get this part of code cleaner
| .filter(Objects::nonNull) | ||
| .collect(Collectors.toList()); | ||
|
|
||
| // We should only ever have cache or queries to run, not both. So if we have no segments, try caches |
There was a problem hiding this comment.
If so, could you replace ServerMaybeSegmentMaybeCache with two nullable fields with two different classes, both having only two fields?
| // See io.druid.java.util.common.guava.MergeSequenceTest.testScrewsUpOnOutOfOrder for an example | ||
| // With zero results actually being found (no segments no caches) this should essentially return a no-op | ||
| // merge sequence | ||
| return new MergeSequence<>(query.getResultOrdering(), Sequences.fromStream( |
There was a problem hiding this comment.
Should be
return new MergeSequence<>(
query.getResultOrdering(),
Sequences.fromStream(
segmentOrResult
.stream()
.map(ServerMaybeSegmentMaybeCache::getCachedValue)
.filter(Objects::nonNull)
.map(Collections::singletonList)
.map(Sequences::simple)
)
);
| segmentOrResult | ||
| .stream() | ||
| .map(ServerMaybeSegmentMaybeCache::getCachedValue) | ||
| .filter(Objects::nonNull) |
There was a problem hiding this comment.
Don't nulls here break the assumptions?
| .stream() | ||
| .map(ServerMaybeSegmentMaybeCache::getCachedValue) | ||
| .filter(Objects::nonNull) | ||
| .map(Collections::singletonList) |
There was a problem hiding this comment.
.map(result -> Sequences.simple(Collections.singletonList(result))) would be clearer (or add Sequence.singleton())
| * | ||
| * @return A stream of potentially cached results per server | ||
| */ | ||
|
|
|
phew package rename is pushed, fixing unit tests and then I'll get to roman's comments |
|
got clobbered by #6313 , fixing |
|
This Travis fail looks legit. Please check. |
|
Also, are there unit tests for this feature? |
|
I'm digging more. Trying to get this pr back up to the top of my priority list and address Roman's comments |
|
@drcrallen thanks. Also please add some JMH benchmark. |
|
@jihoonson what kind of info are you wanting for JMH stuff? |
|
@drcrallen basically I want to check how large performance benefit the new parallel merge algorithm can give us. Even though you mentioned that it shows about 85% of performance improvement in some internal tests, it shows the overall performance benefit not the algorithm itself. Of course, the overall performance is more important, but the later one is also important because it gives us an insight about what we can expect exactly with this feature, I think. Also the query performance is affected by many factors like dataSource size, query filter selectivity, # of aggregators and their types, cluster size, and so on. JMH would be useful because we can easily replicate the performance benchmark with the same query and the same data which makes us easy to maintain or improve it in the future. I think the JMH should include the below:
|
|
|
|
@drcrallen hmm, maybe there's some misunderstanding. As you said, this is still an experimental feature, and exact numbers are not much important at this point. However, we need to add a benchmark implementation which we can do some performance tests, so that we can have better understanding of the implementation of parallel merge algorithm in this PR and how to further improve it. Does it make sense? |
|
For those finding this later, there are problems with GroupBy queries that are not appropriately addressed here. I think the split up between handling CachingClusterClient changes (which may or may not be needed) and enabling the GroupBy results merging to work better in parallel are really two different PRs and need addressed separately. I'm leaving the PR open for now as a reference, but it is unlikely to have any more development in favor of pursuing a more sustainable (and groupby compatible) approach |
|
This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@druid.apache.org list. Thank you for your contributions. |
|
This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@druid.apache.org list. Thank you for your contributions. |
|
This pull request/issue has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
We are seeing severe broker bottlenecks for certain types of queries. The existing Sequence paradigm does a folding reduction whereby the results from all the historicals and real-time nodes are folded into a single accumulator. This accumulation can be a major bottleneck in terms of performance for these large queries. TopN queries with results on order 10k returning from hundreds of nodes can take many seconds (in excess of 40s in some cases) to do the merge on the broker! A cpu-time flame graph of such a scenario can be seen below:
In an attempt to "fix" this bottleneck, I was having a hard time following the logic in
CachingClusteredClient. So I re-wrote a bunch of the logic in java8 streams, attempting to retain as much of the original functionality as possible. I also added many more comments as to why different sections were doing what they are doing. As a killer addition, a new query contextintermediateMergeBatchThresholdis added which allows parallel intermediate mergesIntermediate merges
The Sequences produced by the druid client connections are collected in a Stream. If the query context
intermediateMergeBatchThresholdis specified, then Spliterators are forked off of the Sequence Stream in batches according to the threshold if possible. Failure to do so reverts to prior merge behavior. These Sequences are fully materialized by working in a ForkJoinPool, and the results are Accumulated (reduce-fold-left kind of operation) on a first-available basis. All of the results fetched from cache are also another Sequence that gets merged in.Why Fork Join Pool?
The "work" for the intermediate merges is done in a dedicated
ForkJoinPool. A fork-join-pool was used instead of an Executor service in order to accommodate dynamic expansion of the worker pool. Basically to use a worker setup slightly better than running all the folding accumulation in the http server thread. The forking and joining semantics are used, but any effects of potentially blocking operations (like QTL) are not attempted to be accounted for. As such, having a specific pool of resources was chosen with the hope that eventually it can be more intelligent about being kind when doing potentially blocking operations. This is no worse than a fixed pool executor service and, when done properly, can handle new threads to circumvent blocked threads in a better manner. Future enhancements can include merge work task prioritization based on query priority, as well as enhancements to account for potentially blocking code.Additionally, with how the
forkandjoinitems are intended to work, aListenableFuturestyle workflow can be attained by doing ajoinof a task, and performing a task based on the result, so long as thejoinis executed in the FJP.Other changes
Streaming methods are added to the Cache interface in order to accommodate changes in the caching cluster client.
CombiningSequenceis moved out of thecommonmodule intojava-utilwith the otherSequenceclassesAdditionally, small changes in the way the per-druid-node Sequences are initiated are present. The proposed implementation causes the direct druid client to fire off queries to all the nodes regardless of intermediate merges being used or not. The cached and uncached results now feed through the same "Stream" and are simply resolved as either a Merge Sequence (for cached results) or a Sequence from the direct druid client (for cache misses). This caused some unit tests to be changed to allow timeline lookups when previously there were none.
Results
Initial internal results show that query speeds were improved by 85% for large queries with
intermediateMergeBatchThresholdset. Impact on smaller queries have not been fully investigated, and impact whenintermediateMergeBatchThresholdis not set (just the new Stream workflow) is not fully exercised.Work can clearly be seen being done in the ForkJoinPool as per:
With the final merge in the
QueryResource.doPostunder the jetty servlet stack.Not included here
There are other items of interest for larger clusters, such as broker-brokers (or cross-broker merges). Hopefully the migrating of CachingClusterClient onto java streams makes such a possibility easier in the future.
The impact on heap pressure from turning on this feature has not been fully explored. Since this feature is intended to help large clusters with large results, this could be a major problem if not taken care.
Handling back-pressure better would be a nice improvement. For example, if druid-client request returns could be controlled in a better manner to prevent an arbitrary number of them from returning an arbitrary size of result, then that would help larger results to be handled by the brokers.
TODO
Conclusion
This is a huge performance boon (85%) for one of our core use cases. It would be interesting to see if this helps with large group-by queries as well. Since this is an opt-in feature that is very close to prior behavior when disabled, hopefully the risk for the change is very small, while the clarity of how the data stream is handled is clearer (or at least documented more).