Skip to content

[Proposal] Use bitmap iteration for filtered aggregators #6889

@peferron

Description

@peferron

Problem summary

Filtered aggregators are slower than query root filters because they don't leverage bitmap iteration. This is a problem because:

  • This behavior is not obvious to Druid users.
  • There is no good workaround.

Problem details

In Druid parlance, filters are partitioned into pre-filters and post-filters. Pre-filters contain conditions that can use bitmap iteration to jump to the next matching row, such as my_dimension = my_value. Post-filters contain conditions that must be evaluated on every row, such as my_metric > my_value.

Query root filters are efficient because they indeed use bitmap iteration to jump to the next row that matches the root pre-filters.

Filtered aggregators are less efficient because they evaluate their pre-filters on every row that matches the root filter.

The result is that this sample query written using a root filter:

{
   "filter": {"type": "selector", "dimension": "my_dim", "value": "my_val"},
   "aggregations": [
       {"type": "count", "name": "my_agg"}
   ]
}

Can be several times faster than the equivalent query written using a filtered aggregator:

{
   "filter": null,
   "aggregations": [
       {
           "type" : "filtered",
           "filter" : {"type": "selector", "dimension": "my_dim", "value": "my_val"},
           "aggregator" : {"type": "count", "name": "my_agg"}
       }
   ]
}

The obvious workaround is to rewrite the second query above into the first before sending it to Druid. But this only works on a subset of queries. For example, the following query cannot be rewritten:

{
   "filter": null,
   "aggregations": [
       {
           "type" : "filtered",
           "filter" : {"type": "selector", "dimension": "my_dim_1", "value": "my_val_1"},
           "aggregator" : {"type": "count", "name": "my_filtered_agg_1"}
       },
       {
           "type" : "filtered",
           "filter" : {"type": "selector", "dimension": "my_dim_2", "value": "my_val_2"},
           "aggregator" : {"type": "count", "name": "my_filtered_agg_2"}
       }
   ]
}

The next workaround may be to add a unioned root filter:

{
   "filter": {
       "type": "or",
       "fields": [
           {"type": "selector", "dimension": "my_dim_1", "value": "my_val_1"},
           {"type": "selector", "dimension": "my_dim_2", "value": "my_val_2"}
       ]
   },
   "aggregations": [
       {
           "type" : "filtered",
           "filter" : {"type": "selector", "dimension": "my_dim_1", "value": "my_val_1"},
           "aggregator" : {"type": "count", "name": "my_filtered_agg_1"}
       },
       {
           "type" : "filtered",
           "filter" : {"type": "selector", "dimension": "my_dim_2", "value": "my_val_2"},
           "aggregator" : {"type": "count", "name": "my_filtered_agg_2"}
       }
   ]
}

But again, this only performs well on the subset of queries where the resulting union has low selectivity. The worst case is if the query contains an unfiltered aggregation:

{
   "filter": null,
   "aggregations": [
       {
           "type": "count",
           "name": "my_unfiltered_agg"
       },
       {
           "type" : "filtered",
           "filter" : {"type": "selector", "dimension": "my_dim", "value": "my_val"},
           "aggregator" : {"type": "count", "name": "my_filtered_agg"}
       }
   ]
}

The next workaround is to split the query into sub-queries that are sent to Druid separately and processed in parallel:

// Sub-query #1
{
   "filter": null,
   "aggregations": [
       {"type": "count", "name": "my_unfiltered_agg"}
   ]
}

// Sub-query #2
{
   "filter": {"type": "selector", "dimension": "my_dim", "value": "my_val"},
   "aggregations": [
       {"type": "count", "name": "my_filtered_agg"}
   ]
}

But this is complicated for users to implement, adds processing overhead, and can introduce inconsistency since the sub-queries may not scan the exact same set of rows especially in a streaming ingestion scenario.

Overall this means there's no good workaround. Please let me know if I missed one.

Potential solutions

To keep things simple, let's assume that all aggregators are filtered and that only pre-filters exist (no post-filters). Let's also ignore any potential optimizations around shared filters (#4267).

Current implementation (for reference): test each aggregator on each row

for row in segment:
  for aggregator in aggregators:
    if aggregator.bitmap.has(row):
      aggregator.aggregate(row)

Pros: simple to implement, which means fewer bugs and easier maintenance.

Cons: as mentioned earlier, the bitmap value is checked for every single row in the segment. This is inefficient, especially for low-selectivity filters.

Solution A: compute each aggregator separately

for aggregator in aggregators:
  for row in aggregator.bitmap:
      aggregator.aggregate(row)

Pros: uses bitmap iteration to skip non-matching rows.

Cons: scans through the data multiple times. Since Druid storage is columnar, this may not be a problem if the filtered aggregators read mutually disjoint sets of columns. But if there's overlap, then data locality suffers, since the same data is decompressed and read multiple times.

Solution B: repeatedly find the aggregator with the lowest next matched row

while not done:
  aggregator, row = find_aggregator_with_lowest_next_matched_row(aggregators)
  aggregator.aggregate(row)

This can be implemented in multiple ways. A typical implementation is via a priority queue holding the next matched row of each aggregator's bitmap.

Pros: uses bitmap iteration to skip non-matching rows, while maintaining single-pass in-order data access.

Cons: increased complexity, and the extra overhead makes it difficult to beat the current implementation in all scenarios.

Early results

I benchmarked a few different implementations of solution B. I'm not linking to the code because I'd rather get feedback about the overall idea first, and the code will get thrown away anyway.

A min-heap implementation yielded the following results on a set of arbitrary test queries (each number is the ratio of avg ns/op compared to unpatched for that test query, so < 1 is faster and > 1 is slower):

1.02, 1.00, 0.11, 0.15, 0.47, 1.22, 1.09, 0.03, 0.45, 0.99, 0.68

The massive speed-ups such as 0.11 and 0.03 happen for test queries that contain low-selectivity filtered aggregators, which makes sense.

Another somewhat weird implementation on top of Roaring yielded less extreme speed-ups, but also smaller regressions:

1.02, 1.02, 0.12, 0.15, 0.48, 1.01, 0.96, 0.09, 0.95, 0.91, 0.79

The last two test queries mimic real queries that we run in production, and get 10-30% speed-ups.

I cut a few corners hacking this in, and I'm not sure what the final performance would look like. I'm especially concerned about extra garbage and memory pressure for some implementations which may not show up in benchmarks. But at least it seems worth looking into.

Next steps

I'm not that familiar with the Druid code base, so is there any obvious solution or blocker that I missed? Otherwise I'd love to get some general feedback about this idea.

Indexes differentiate Druid from its competition—which is usually faster on raw scan speed—so I think it makes sense to try to squeeze as much out of indexes as possible, especially when implementations as good as Roaring are available.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions