Skip to content

Proposal: groupBy query adjustments, introduce merging buffers #2987

@gianm

Description

@gianm

Currently groupBy works like this:

  • factory.createRunner (run on historicals) uses GroupByQueryEngine to return a Sequence<Row> for each segment. For each row returned by the cursor, GBQE uses an array of the dim ids as a key into an on-heap positions map, where the values are positions of aggregators in an off-heap computation buffer. The contents of this buffer are emitted out to the Sequence either when the buffer fills up, or when the positions map size reaches maxIntermediateRows. (I guess this is here to prevent the positions map from taking up too much heap space)
  • factory.mergeRunners (run on historicals) submits factory.createRunner-based tasks to the processing thread pool that use an accumulator to add rows from their sequences to a single IncrementalIndex, allocated by mergeRunners. If at any point there are more than maxResults rows in this index, the query fails. This index might be onheap (default) or might be varying degrees of offheap (new OffHeapIncrementalIndex that does only aggregations off-heap #2325, update OffheapIncrementalIndex to put facts off-heap  #2847). After the processing tasks are all done, the outer runner emits rows from that index. avoid sort while doing groupBy merging when possible #2571 changed this index to be unsorted rather than sorted, because nothing actually depended on the fact that it was sorted.
  • toolchest.mergeResults on the historical used to do an unnecessary merge again in a second index, but at historicals GpBy query mergeResults does not need merging  #2962 adjusted that so it is a no-op.
  • toolchest.mergeResults on the broker handles merging by adding all incoming rows to an IncrementalIndex. If at any point there are more than maxResults rows in this index, the query fails. It then applies limits by possibly re-sorting and limiting the rows coming out of that index. If the limit involves a re-sort then a copy of the results get materialized, otherwise the limit is applied to the stream. If there is a nested query, that also always materializes the inner query result set.

There are some things about this that can be improved:

  1. The maxResults limit is driven by memory constraints right now, it'd be better to allow temporary storage of results on disk so we can return larger result sets.
  2. The number of merging indexes allocated by query.mergeRunners and toolchest.mergeResults is limited only by the jetty thread pool size, which can be large and can cause us to allocate too much memory for merging.
  3. Merging on-heap makes it difficult to manage memory, but previous attempts at merging off-heap have had performance issues. update OffheapIncrementalIndex to put facts off-heap  #2847 looks promising for unsorted results but is still behind the on-heap implementation for sorted results.
  4. The broker could avoid materialization when we are ordering on dimensions rather than metrics, by pushing sort down to historicals and then doing a stream merge instead of a materialized merge.
  5. The broker could push down limits too when we are ordering on dimensions.
  6. The broker could avoid materialization for nested queries and instead apply the outer query directly to the incoming stream (when the inner query can be streamed).

I've been working on an implementation that tries to address 1, 2, 3, and 4 (not 5 or 6 right now). So far in benchmarks performance is similar to or better than the existing on-heap version. In the proposed new implementation:

  • factory.createRunner does something pretty similar, except it stores its equivalent of positions off-heap too and as such, doesn't have a maxIntermediateRows config.
  • DruidProcessingModule provides a new @Merged BlockingPool<ByteBuffer> that can be used for merging. It has a fixed maximum size and it's blocking, so jetty threads can request one when they want to do a query that will require a merge buffer, and then wait in line if one is not available (without submitting tasks to the processing thread pool). Queries that don't need merging buffers don't have to wait.
  • factory.mergeRunners is similar, except instead of using IncrementalIndex for merging, it uses an off-heap hashtable that is not an IncrementalIndex. It uses buffers from the merging pool and it returns sorted results. In this implementation the dictionary is still on-heap but everything else is off-heap. The merger spills to sorted files on disk when either the merge buffer is full, or the dictionary size exceeds a maxMergingDictionarySize limit. The merger can take advantage of features of the groupBy query workload that do not apply to regular indexing: the number of dimensions is known in advance, it's not necessary to be able to read anything until all writes have been done, and it's not necessary to be able to read from more than one thread.
  • toolchest.mergeRunners uses a ResultMergeQueryRunner to merge result streams, which it assumes are ordered on time and dimensions. I think this OK if getResultOrdering in the query is overridden, since the broker uses MergeIterable with that comparator when stitching together streams from data nodes and cache, so they should get ordered properly before they make it to the toolChest. The broker still materializes and sorts results if the limitSpec involves an order by on something other than the dimensions. Otherwise it will just limit the stream.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions