Conversation
…ers. - Introduce a new SqlBenchmark geared towards benchmarking a wide variety of SQL queries. Rename the old SqlBenchmark to SqlVsNativeBenchmark. - Add (optional) caching to SegmentGenerator to enable easier benchmarking of larger segments. - Add vectorization to FilteredAggregatorBenchmark and GroupByBenchmark.
This patch includes vectorized timeseries and groupBy engines, as well as some analogs of your favorite Druid classes: - VectorCursor is like Cursor. (It comes from StorageAdapter.makeVectorCursor.) - VectorColumnSelectorFactory is like ColumnSelectorFactory, and it has methods to create analogs of the column selectors you know and love. - VectorOffset and ReadableVectorOffset are like Offset and ReadableOffset. - VectorAggregator is like BufferAggregator. - VectorValueMatcher is like ValueMatcher. There are some noticeable differences between vectorized and regular execution: - Unlike regular cursors, vector cursors do not understand time granularity. They expect query engines to handle this on their own, which a new VectorCursorGranularizer class helps with. This is to avoid too much batch-splitting and to respect the fact that vector selectors are somewhat more heavyweight than regular selectors. - Unlike FilteredOffset, FilteredVectorOffset does not leverage indexes for filters that might partially support them (like an OR of one filter that supports indexing and another that doesn't). I'm not sure that this behavior is desirable anyway (it is potentially too eager) but, at any rate, it'd be better to harmonize it between the two classes. Potentially they should both do some different thing that is smarter than what either of them is doing right now. - When vector cursors are created by QueryableIndexCursorSequenceBuilder, they use a morphing binary-then-linear search to find their start and end rows, rather than linear search. Limitations in this patch are: - Only timeseries and groupBy have vectorized engines. - GroupBy doesn't handle multi-value dimensions yet. - Vector cursors cannot handle virtual columns or descending order. - Only some filters have vectorized matchers: "selector", "bound", "in", "like", "regex", "search", "and", "or", and "not". - Only some aggregators have vectorized implementations: "count", "doubleSum", "floatSum", "longSum", "hyperUnique", and "filtered". - Dimension specs other than "default" don't work yet (no extraction functions or filtered dimension specs). Currently, the testing strategy includes adding vectorization-enabled tests to TimeseriesQueryRunnerTest, GroupByQueryRunnerTest, GroupByTimeseriesQueryRunnerTest, CalciteQueryTest, and all of the filtering tests that extend BaseFilterTest. In all of those classes, there are some test cases that don't support vectorization. They are marked by special function calls like "cannotVectorize" or "skipVectorize" that tell the test harness to either expect an exception or to skip the test case. Testing should be expanded in the future -- a project in and of itself. Related to apache#3011.
|
Very cool, I think both @xvrl and @nishantmonu51 may have taken a hack at this previously as well. There is even https://github.com/metamx/druid/tree/gpudruid if anyone wants to peek at super old code. Looking forward to see how this patch performs |
|
There's some benchmarks in the description (https://static.imply.io/gianm/vb.html) & I have also run some queries on a real cluster and seen similar speedups. Although notably, topN isn't vectorized in this patch, and contributions are welcome on that one 😄. I would expect speedup ratios there similar to groupBy, maybe a bit less since topN already has some optimizations targeted at reducing method call overhead (the monomorphic specialization system). There's some TeamCity inspections going bump because of unused stuff. Most of it is there for a reason so I'll add some comments that hopefully make the inspections happy. |
|
Updated the top comment to be more proposal-y following discussion on the dev list. |
I do not think this is going to work, because the updates to the description is something that only authors are doing and there is no tracking for history. |
There is history (click "edited" in the top bar of the comment). I don't think there's anything wrong with edits coming from the author of the proposal, so long as they reflect community discussion.
I used the Kafka template from https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals. As I understand it, this section means "rejected by the author of the proposal", and is meant to help reflect the author's thinking in the course of developing the proposal. In the current version it reflects my thinking alone. I would update it to reflect community discussion.
That sounds like a good discussion point!! Would love to hear any more thoughts you have on this topic. As I see it- supporting Arrow as an export interface and using it internally are two different things and could happen independently of each other. There is also a potential case to be made around using Arrow for the later stages of query execution, after per-segment processing, even if it's not used for per-segment processing. Druid doesn't currently always use optimized data structures for this phase (there's a lot of in-heap Java objects involved, and some glaring inefficiencies like representing rows using LinkedHashMaps). Fwiw, the KIP process suggests the following for discussion,
If we want to try something similar -- we can do that in comments for this PR. Do you think that will work? |
|
Some previous discussion on Arrow as an export format in particular: #3891 (should be largely unrelated to this proposal, which is concerned with internal processing, not export). |
This is true, but I would like to suggest to use a better tool for easy discussion. I think the proposal will be usually large, which means it's not easy to point some particular parts of the proposal. Probably we can use Github PR for the proposals as well, so that people can easily leave comments. |
As i said i am not targeting per-say this PR. Ideally would love to see a formal agreement about the process on the dev mailing list and then maybe start implementing it on this PR if possible.
Thanks i guess i learned new thing today. FYI as a side note I honestly prefer a separation between Code reviewing and Proposal reviewing discussion , for the simple reason that when the code review starts it will introduce a lot of noise about the implementation details and therefore all the discussion about design will be lost or not possible to sort out, In fact Github pages becomes unloadable once we get bunch of comments. |
|
Sounds good, let's take that part of the discussion to the mailing list. Still very open to comments on this particular change though. |
|
I added a "Future work" section that illustrates what would be left to be done before the feature is 'complete'. I don't necessarily think it makes sense to do any of it in the first PR, since the first PR would just be to lay a foundation that allows the other work to happen. |
|
👍 on this proposal - looks to be a win across the board |
|
@gianm do you want to keep the discussion here or start an issue as design review? |
|
Noticed that |
|
Do you think the VectorCursor Api for Realtime based segments is going to be the same? or you have something else in mind? |
|
Also i saw this |
|
wondering why we need to have |
Do we really need to support select? IMO we need to drop it as soon as we can it causes lot of OOM and i don't see why would use it when you can use scan. |
I was waiting to see how the thread on the dev list goes and then do what we decide there.
I wasn't sure how to translate that specific API to the vectorized impl (it doesn't necessarily make sense to seek to a specific offset when you're talking about blocks of rows) & thought it made more sense for the query engines that need it to do it themselves. It's only used in one place by the select query.
It should be the same, I think, just like the regular Cursor is the same. You would expect vectorization to not have as much of an impact on IncrementalIndex, because bulk column reads don't improve efficiency the same way they do on a QueryableIndex, but you might still see some benefit on other parts of the query pipeline.
They're used for topN prototypes & were added in #3889. I'm not sure if we'll want vectorized analogs or not. I am thinking that the topN prototype system is probably not as necessary for a potentially vectorized topN engine, since part of the point of monomorphic processing is to reduce virtual method call overhead, but vectorization does that too (in a different way).
I think it ended up being used by AggregatorFactories, which get a column selector factory when they factorize an aggregator, but not a cursor. It ended up being useful to add max/current vector size getters to a variety of places for reasons like that.
#6088 is the only real reason to use select. I think when we have a solution to that problem we should definitely deprecate & drop it. I could see the potential to never vectorize it, and favor improving scan -> dropping select instead. It'd just be important to vectorize all query engines that exist at some point in time, if we want to get rid of the non-vectorized code paths. |
|
The Travis errors are interesting; the tests run OOM. I got a heap dump and saw that ~78% of the heap is being used by org.apache.maven.plugin.surefire.report.WrappedReportEntry objects. It sounds like https://issues.apache.org/jira/browse/SUREFIRE-1147. It seems like there are too many tests now for surefire to handle! I'll try updating to the latest 2.x surefire (2.22.2). If that doesn't change anything I'll try bumping up the Xmx in Travis a bit. Right now it's set to 512m. If that doesn't work then I guess we either need to patch surefire, or refactor the tests to not have so many test cases. |
|
Updating surefire didn't help, but I kept the new version anyway. I'll try bumping Xmx to 800m. |
|
It looks like that bump worked… for now :) |
clintropolis
left a comment
There was a problem hiding this comment.
Finally finished line by line review over the past couple months, taking notes until I finished. I've actually been watching this branch for over a year now! It's a ton more complete and polished now than it was then 😜
Overall I'm +1 to merge this; I think the interfaces look good, the code is mostly isolated in a new, experimental codepath, and where existing things are touched it's either just pushing down part of an interface so it can be shared, or leaving it in a cleaner/better place than it was before.
I had a handful of comments, but consider them all basically nitpicking, and would be ok if it was merged as is 🤘
That said, I've a few of the more complex parts of the code I'm going to have another pass at soon to make sure I didn't miss anything because this PR is a beast!
| private final AtomicInteger seed; | ||
| // Setup can take a long time due to the need to generate large segments. | ||
| // Allow users to specify a cache directory via a JVM property or an environment variable. | ||
| private static final String CACHE_DIR_PROPERTY = "druid.benchmark.cacheDir"; |
| private GroupByQuery groupByQuery; | ||
| private String sqlQuery; | ||
| private Closer resourceCloser; | ||
| private static final List<String> QUERIES = ImmutableList.of( |
There was a problem hiding this comment.
nice collection of queries 👍
Side note, my gut tells me that the sequential dimensions and metrics we have in the benchmarks schemas are probably not super typical of data that appears in real datasets other than maybe the time column?
The zipf dimension in the basic schema (and all current benchmark schemas) is pretty low cardinality because of how it's currently setup where it enumerates out all the values into an array. It takes an a lot of memory and is very slow to do higher cardinalities. To get high cardinality zipf distributions I had to modify the generator to make a lazy version, which I hope to make it in someday, and could help produce additional column value distributions for higher cardinalities and with different exponents.
Where I'm going with this is, that I think it could be interesting in the future to tweak these to allow running the same queries on different value distributions to see if there is any effect, though I'm unsure how we would handle filter value matches.
No need to do anything in this PR tho imo, just thinking out loud about stuff to do in the future maybe
There was a problem hiding this comment.
That all sounds good. More realistic benchmark datasets would be great. Maybe we could even use a realistic data generator like https://github.com/joke2k/faker (if there is something like this available in Java).
|
|
||
| |property|default| description| | ||
| |--------|-------|------------| | ||
| |vectorize|`false`|Enables or disables vectorized query execution. Possible values are `false` (disabled), `true` (enabled if possible, disabled otherwise, on a per-segment basis), and `force` (enabled, and groupBy or timeseries queries that cannot be vectorized will fail). The `"force"` setting is meant to aid in testing, and is not generally useful in production (since real-time segments can never be processed with vectorized execution, any queries on real-time data will fail).| |
There was a problem hiding this comment.
What is the reason for force exploding for any queries on real-time data instead of just ignoring it and doing non-vectorized? Is it a hassle to ignore it for incremental indexes only?
I think I understand the utility in the context of historical servers, as a mechanism to ensure that the query that is being tested fully supports vectorization as we add additional vectorized code paths, but if we don't anticipate vectorization to be worth it for incremental index then the purpose of exploding here seems lower. Or do you imagine that someday maybe realtime will also be vectorized and want to leave this in as an option?
This is totally a nit btw and not saying it should be changed. Since this option is just for testing we can just like... control the interval being queried to not hit any incremental indexes and not have any issues.
There was a problem hiding this comment.
The idea is that if you set force then you know your query will be 100% vectorized. It's meant to aid in testing and development -- if you want to be totally sure you're testing vectorized code paths. true is the setting that is meant to be useful in production scenarios.
By the way, I don't see a reason to not eventually vectorize processing of the IncrementalIndex. I'm not sure if it'll give a speed boost, but it might allow us to eventually delete the nonvectorized code paths. Maintaining both forever would be a pain.
| void identity(String identity); | ||
|
|
||
| /** | ||
| * Sets whether are not a segment scan has been vectorized. Generally expected to only be attached to segment-level |
There was a problem hiding this comment.
Whoa, how did I miss that? Rookie mistake.
| return mapFn.apply(input); | ||
| } | ||
| } | ||
| mapFn |
|
|
||
| for (int i = 0; i < numRows; i++) { | ||
| final int position = positions[i] + positionOffset; | ||
| buf.putDouble(position, buf.getDouble(position) + vector[rows != null ? rows[i] : i]); |
There was a problem hiding this comment.
nit: maybe I'm being overly cautious and branch prediction will make this not matter, but why leave it to chance and maybe instead consider splitting this into if (rows != null) { for ... } else { for ... }. Same for float/long if you end up changing here.
There was a problem hiding this comment.
I'm not sure if it matters or not. It might be something interesting to check out after the dust settles on the patch. In the face of uncertainty about which path is best, I decided to err on the side of the more readable code. (Which I think the current patch is)
| : DateTimes.utc(Long.parseLong(fudgeTimestampString)); | ||
|
|
||
| final Filter filter = Filters.convertToCNFFromQueryContext(query, Filters.toFilter(query.getFilter())); | ||
| final Interval interval = Iterables.getOnlyElement(query.getIntervals()); |
There was a problem hiding this comment.
makes sense that this should be enforcing that intervals only has 1 rather than silently eating it like before
There was a problem hiding this comment.
Yeah, I figured that too.
| * known that there are no nulls in the vector, possibly because the column is non-nullable. | ||
| */ | ||
| @Nullable | ||
| boolean[] getNullVector(); |
There was a problem hiding this comment.
I guess this is a boolean array instead of bitset or something similar since it's relatively small overall?
There was a problem hiding this comment.
Honestly I hadn't tried options other than the boolean[]. The amount of memory use here is minimal and I thought the array would be faster. But I didn't put a ton of thought into this decision. It could change later if we do some experiments to see what's actually best.
| final ByteBuffer keyBuffer, | ||
| final int keyHash | ||
| final int keyHash, | ||
| final Runnable preTableGrowthRunnable |
There was a problem hiding this comment.
Could you add javadoc about what is going on with here with Runnable preTableGrowthRunnable?
| @Override | ||
| public BatchIterator batchIterator() | ||
| { | ||
| return bitmap.getBatchIterator(); |
clintropolis
left a comment
There was a problem hiding this comment.
lgtm, thanks for quick turnaround 🤘
| * At this threshold, timestamp searches switch from binary to linear. See | ||
| * {@link #timeSearch(NumericColumn, long, int, int, int)} for more details. | ||
| */ | ||
| private static final int TOO_CLOSE_FOR_MISSILES = 15000; |
There was a problem hiding this comment.
Just wonder how that value was chosen?
There was a problem hiding this comment.
See this comment from the timeSearch method:
The idea is to avoid too much decompression buffer thrashing. The default value
TOO_CLOSE_FOR_MISSILESis chosen to be similar to the typical number of timestamps per block.
I moved the sentence about choice of default value to the javadoc for TOO_CLOSE_FOR_MISSILES, and kept the "idea" comment in the javadoc for timeSearch.
There was a problem hiding this comment.
It's not clear why the typical number of timestamps per block is 15000?
There was a problem hiding this comment.
It's not clear why the typical number of timestamps per block is 15000?
Each block in a long typed column is 64KB, so if a timestamp can be stored in 3-4 bytes (typical with delta encoding) there's roughly that number per block. Or really a bit more.
But actually, you know what, I had never benchmarked it until today. I just benchmarked it and it looks like the pure binary search is fastest anyway. So, I removed the switching and made the search purely binary.
Benchmark results:
Benchmark (query) (rowsPerSegment) (timestampString) (tooCloseForMissiles) (vectorize) Mode Cnt Score Error Units
timeSearch 15 5000000 2000 0 force avgt 10 0.647 ± 0.001 ms/op
timeSearch 15 5000000 2000 5000 force avgt 10 0.643 ± 0.001 ms/op
timeSearch 15 5000000 2000 15000 force avgt 10 0.643 ± 0.001 ms/op
timeSearch 15 5000000 2000 25000 force avgt 10 0.579 ± 0.001 ms/op
timeSearch 15 5000000 2000 50000 force avgt 10 0.514 ± 0.001 ms/op
timeSearch 15 5000000 2000 99999999 force avgt 10 ≈ 10⁻⁴ ms/op
timeSearch 15 5000000 2000-01-01T01:00:00 0 force avgt 10 0.578 ± 0.001 ms/op
timeSearch 15 5000000 2000-01-01T01:00:00 5000 force avgt 10 0.589 ± 0.001 ms/op
timeSearch 15 5000000 2000-01-01T01:00:00 15000 force avgt 10 0.588 ± 0.001 ms/op
timeSearch 15 5000000 2000-01-01T01:00:00 25000 force avgt 10 0.751 ± 0.008 ms/op
timeSearch 15 5000000 2000-01-01T01:00:00 50000 force avgt 10 0.607 ± 0.004 ms/op
timeSearch 15 5000000 2000-01-01T01:00:00 99999999 force avgt 10 2.106 ± 0.078 ms/op
timeSearch 15 5000000 2000-01-01T10:30 0 force avgt 10 0.708 ± 0.001 ms/op
timeSearch 15 5000000 2000-01-01T10:30 5000 force avgt 10 0.722 ± 0.002 ms/op
timeSearch 15 5000000 2000-01-01T10:30 15000 force avgt 10 0.728 ± 0.001 ms/op
timeSearch 15 5000000 2000-01-01T10:30 25000 force avgt 10 0.773 ± 0.016 ms/op
timeSearch 15 5000000 2000-01-01T10:30 50000 force avgt 10 0.854 ± 0.001 ms/op
timeSearch 15 5000000 2000-01-01T10:30 99999999 force avgt 10 21.863 ± 0.146 ms/op
timeSearch 15 5000000 2000-01-01T16:30 0 force avgt 10 0.643 ± 0.001 ms/op
timeSearch 15 5000000 2000-01-01T16:30 5000 force avgt 10 0.653 ± 0.001 ms/op
timeSearch 15 5000000 2000-01-01T16:30 15000 force avgt 10 0.668 ± 0.001 ms/op
timeSearch 15 5000000 2000-01-01T16:30 25000 force avgt 10 0.685 ± 0.004 ms/op
timeSearch 15 5000000 2000-01-01T16:30 50000 force avgt 10 0.935 ± 0.013 ms/op
timeSearch 15 5000000 2000-01-01T16:30 99999999 force avgt 10 39.498 ± 0.022 ms/op
timeSearch 15 5000000 2000-01-02 0 force avgt 10 0.602 ± 0.001 ms/op
timeSearch 15 5000000 2000-01-02 5000 force avgt 10 0.616 ± 0.003 ms/op
timeSearch 15 5000000 2000-01-02 15000 force avgt 10 0.645 ± 0.001 ms/op
timeSearch 15 5000000 2000-01-02 25000 force avgt 10 0.707 ± 0.001 ms/op
timeSearch 15 5000000 2000-01-02 50000 force avgt 10 0.811 ± 0.001 ms/op
timeSearch 15 5000000 2000-01-02 99999999 force avgt 10 61.194 ± 0.036 ms/op
There was a problem hiding this comment.
@egor-ryashin What do you think of the latest changes?
There was a problem hiding this comment.
Removing tooCloseForMissiles check, isn't it an equivalent of setting tooCloseForMissiles = 99999999
which shows slower operations?
There was a problem hiding this comment.
@egor-ryashin It would be the equivalent of tooCloseForMissiles = 0, since I removed a check that did a “break” out of the binary search when maxIndex and minIndex were less than tooCloseForMissiles apart.
(the threshold was: when search is this close, switch to linear. So 99999999 means fully linear and 0 means fully binary)
|
Going to merge in #8049 to this branch when it's available, since that should clean up Travis results again. Thanks to everyone that reviewed. @egor-ryashin I think I have addressed all your comments, let me know if you agree. I'm looking to merge this to master soon so we can start building on top of it. |
| * Even though this array is technically mutable, it is very poor form to mutate it if you are not the owner of the | ||
| * VectorMatch object. | ||
| */ | ||
| int[] getSelection(); |
There was a problem hiding this comment.
Could you mention HotSpot vectorization term in the comment?
| * At this threshold, timestamp searches switch from binary to linear. See | ||
| * {@link #timeSearch(NumericColumn, long, int, int, int)} for more details. | ||
| */ | ||
| private static final int TOO_CLOSE_FOR_MISSILES = 15000; |
There was a problem hiding this comment.
Just wonder how that value was chosen?
| } | ||
|
|
||
| final int result = timeSearch(timestamps, interval.getEndMillis(), startOffset, index.getNumRows()); | ||
| if (result >= 0) { |
|
Just pushed up another commit with this change: #6794 (comment) |
|
Thanks everyone!! |


Implementation of the proposal in #7093.