Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 92 additions & 1 deletion docs/content/querying/groupbyquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ An example groupBy query object is shown below:
}
```

There are 11 main parts to a groupBy query:
Following are main parts to a groupBy query:

|property|description|required?|
|--------|-----------|---------|
Expand All @@ -70,6 +70,7 @@ There are 11 main parts to a groupBy query:
|aggregations|See [Aggregations](../querying/aggregations.html)|no|
|postAggregations|See [Post Aggregations](../querying/post-aggregations.html)|no|
|intervals|A JSON Object representing ISO-8601 Intervals. This defines the time ranges to run the query over.|yes|
|subtotalsSpec| A JSON array of arrays to return additional result sets for groupings of subsets of top level `dimensions`. It is [described later](groupbyquery.html#more-on-subtotalsspec) in more detail.|no|
|context|An additional JSON Object which can be used to specify certain flags.|no|

To pull it all together, the above query would return *n\*m* data points, up to a maximum of 5000 points, where n is the cardinality of the `country` dimension, m is the cardinality of the `device` dimension, each day between 2012-01-01 and 2012-01-03, from the `sample_datasource` table. Each data point contains the (long) sum of `total_usage` if the value of the data point is greater than 100, the (double) sum of `data_transfer` and the (double) result of `total_usage` divided by `data_transfer` for the filter set for a particular grouping of `country` and `device`. The output looks like this:
Expand Down Expand Up @@ -113,6 +114,92 @@ improve performance.

See [Multi-value dimensions](multi-value-dimensions.html) for more details.

### More on subtotalsSpec
The subtotals feature allows computation of multiple sub-groupings in a single query. To use this feature, add a "subtotalsSpec" to your query, which should be a list of subgroup dimension sets. It should contain the "outputName" from dimensions in your "dimensions" attribute, in the same order as they appear in the "dimensions" attribute (although, of course, you may skip some). For example, consider a groupBy query like this one:

```json
{
"type": "groupBy",
...
...
"dimensions": [
{
"type" : "default",
"dimension" : "d1col",
"outputName": "D1"
},
{
"type" : "extraction",
"dimension" : "d2col",
"outputName" : "D2",
"extractionFn" : extraction_func
},
{
"type":"lookup",
"dimension":"d3col",
"outputName":"D3",
"name":"my_lookup"
}
],
...
...
"subtotalsSpec":[ ["D1", "D2", D3"], ["D1", "D3"], ["D3"]],
..

}
```

Response returned would be equivalent to concatenating result of 3 groupBy queries with "dimensions" field being ["D1", "D2", D3"], ["D1", "D3"] and ["D3"] with appropriate `DimensionSpec` json blob as used in above query.
Response for above query would look something like below...

```json
[
{
"version" : "v1",
"timestamp" : "t1",
"event" : { "D1": "..", "D2": "..", "D3": ".." }
}
},
{
"version" : "v1",
"timestamp" : "t2",
"event" : { "D1": "..", "D2": "..", "D3": ".." }
}
},
...
...

{
"version" : "v1",
"timestamp" : "t1",
"event" : { "D1": "..", "D3": ".." }
}
},
{
"version" : "v1",
"timestamp" : "t2",
"event" : { "D1": "..", "D3": ".." }
}
},
...
...

{
"version" : "v1",
"timestamp" : "t1",
"event" : { "D3": ".." }
}
},
{
"version" : "v1",
"timestamp" : "t2",
"event" : { "D3": ".." }
}
},
...
]
```

### Implementation details

#### Strategies
Expand Down Expand Up @@ -182,6 +269,10 @@ With groupBy v2, cluster operators should make sure that the off-heap hash table
will not exceed available memory for the maximum possible concurrent query load (given by
druid.processing.numMergeBuffers). See [How much direct memory does Druid use?](../operations/performance-faq.html) for more details.

Brokers do not need merge buffers for basic groupBy queries. Queries with subqueries (using a "query" [dataSource](datasource.html#query-data-source)) require one merge buffer if there is a single subquery, or two merge buffers if there is more than one layer of nested subqueries. Queries with [subtotals](groupbyquery.html#more-on-subtotalsspec) need one merge buffer. These can stack on top of each other: a groupBy query with multiple layers of nested subqueries, and that also uses subtotals, will need three merge buffers.

Historicals and ingestion tasks need one merge buffer for each groupBy query, unless [parallel combination](groupbyquery.html#parallel-combine) is enabled, in which case they need two merge buffers per query.

When using groupBy v1, all aggregation is done on-heap, and resource limits are done through the parameter
druid.query.groupBy.maxResults. This is a cap on the maximum number of results in a result set. Queries that exceed
this limit will fail with a "Resource limit exceeded" error indicating they exceeded their row limit. Cluster
Expand Down
70 changes: 70 additions & 0 deletions processing/src/main/java/io/druid/query/groupby/GroupByQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.base.Functions;
Expand Down Expand Up @@ -100,6 +101,7 @@ public static Builder builder()
private final List<DimensionSpec> dimensions;
private final List<AggregatorFactory> aggregatorSpecs;
private final List<PostAggregator> postAggregatorSpecs;
private final List<List<String>> subtotalsSpec;

private final boolean applyLimitPushDown;
private final Function<Sequence<Row>, Sequence<Row>> postProcessingFn;
Expand All @@ -116,6 +118,7 @@ public GroupByQuery(
@JsonProperty("postAggregations") List<PostAggregator> postAggregatorSpecs,
@JsonProperty("having") HavingSpec havingSpec,
@JsonProperty("limitSpec") LimitSpec limitSpec,
@JsonProperty("subtotalsSpec") List<List<String>> subtotalsSpec,
@JsonProperty("context") Map<String, Object> context
)
{
Expand All @@ -130,6 +133,7 @@ public GroupByQuery(
postAggregatorSpecs,
havingSpec,
limitSpec,
subtotalsSpec,
null,
context
);
Expand Down Expand Up @@ -172,6 +176,7 @@ private GroupByQuery(
final List<PostAggregator> postAggregatorSpecs,
final HavingSpec havingSpec,
final LimitSpec limitSpec,
final @Nullable List<List<String>> subtotalsSpec,
final @Nullable Function<Sequence<Row>, Sequence<Row>> postProcessingFn,
final Map<String, Object> context
)
Expand All @@ -194,6 +199,7 @@ private GroupByQuery(
this.havingSpec = havingSpec;
this.limitSpec = LimitSpec.nullToNoopLimitSpec(limitSpec);

this.subtotalsSpec = verifySubtotalsSpec(subtotalsSpec, dimensions);

// Verify no duplicate names between dimensions, aggregators, and postAggregators.
// They will all end up in the same namespace in the returned Rows and we can't have them clobbering each other.
Expand All @@ -206,6 +212,40 @@ private GroupByQuery(
this.applyLimitPushDown = determineApplyLimitPushDown();
}

private List<List<String>> verifySubtotalsSpec(List<List<String>> subtotalsSpec, List<DimensionSpec> dimensions)
{
// if subtotalsSpec exists then validate that all are subsets of dimensions spec and are in same order.
// For example if we had {D1, D2, D3} in dimensions spec then
// {D2}, {D1, D2}, {D1, D3}, {D2, D3} etc are valid in subtotalsSpec while
// {D2, D1} is not as it is not in same order.
// {D4} is not as its not a subset.
// This restriction as enforced because implementation does sort merge on the results of top-level query
// results and expects that ordering of events does not change when dimension columns are removed from
// results of top level query.
if (subtotalsSpec != null) {
for (List<String> subtotalSpec : subtotalsSpec) {
int i = 0;
for (String s : subtotalSpec) {
boolean found = false;
for (; i < dimensions.size(); i++) {
if (s.equals(dimensions.get(i).getOutputName())) {
found = true;
break;
}
}
if (!found) {
throw new IAE(
"Subtotal spec %s is either not a subset or items are in different order than in dimensions.",
subtotalSpec
);
}
}
}
}

return subtotalsSpec;
}

@JsonProperty
public VirtualColumns getVirtualColumns()
{
Expand Down Expand Up @@ -249,6 +289,13 @@ public LimitSpec getLimitSpec()
return limitSpec;
}

@JsonInclude(JsonInclude.Include.NON_NULL)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this @JsonInclude do? Does it mean don't write it if it's null? That's kind of cool.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exactly

@JsonProperty("subtotalsSpec")
public List<List<String>> getSubtotalsSpec()
{
return subtotalsSpec;
}

@Override
public boolean hasFilters()
{
Expand Down Expand Up @@ -329,6 +376,10 @@ private boolean validateAndGetForceLimitPushDown()

public boolean determineApplyLimitPushDown()
{
if (subtotalsSpec != null) {
return false;
}

final boolean forceLimitPushDown = validateAndGetForceLimitPushDown();

if (limitSpec instanceof DefaultLimitSpec) {
Expand Down Expand Up @@ -628,6 +679,16 @@ public GroupByQuery withLimitSpec(LimitSpec limitSpec)
return new Builder(this).setLimitSpec(limitSpec).build();
}

public GroupByQuery withAggregatorSpecs(final List<AggregatorFactory> aggregatorSpecs)
{
return new Builder(this).setAggregatorSpecs(aggregatorSpecs).build();
}

public GroupByQuery withSubtotalsSpec(final List<List<String>> subtotalsSpec)
{
return new Builder(this).setSubtotalsSpec(subtotalsSpec).build();
}

public GroupByQuery withPostAggregatorSpecs(final List<PostAggregator> postAggregatorSpecs)
{
return new Builder(this).setPostAggregatorSpecs(postAggregatorSpecs).build();
Expand Down Expand Up @@ -687,6 +748,7 @@ public static class Builder

private Map<String, Object> context;

private List<List<String>> subtotalsSpec = null;
private LimitSpec limitSpec = null;
private Function<Sequence<Row>, Sequence<Row>> postProcessingFn;
private List<OrderByColumnSpec> orderByColumnSpecs = Lists.newArrayList();
Expand All @@ -708,6 +770,7 @@ public Builder(GroupByQuery query)
postAggregatorSpecs = query.getPostAggregatorSpecs();
havingSpec = query.getHavingSpec();
limitSpec = query.getLimitSpec();
subtotalsSpec = query.subtotalsSpec;
postProcessingFn = query.postProcessingFn;
context = query.getContext();
}
Expand Down Expand Up @@ -788,6 +851,12 @@ public Builder setLimit(int limit)
return this;
}

public Builder setSubtotalsSpec(List<List<String>> subtotalsSpec)
{
this.subtotalsSpec = subtotalsSpec;
return this;
}

public Builder addOrderByColumn(String dimension)
{
return addOrderByColumn(dimension, null);
Expand Down Expand Up @@ -962,6 +1031,7 @@ public GroupByQuery build()
postAggregatorSpecs,
havingSpec,
theLimitSpec,
subtotalsSpec,
postProcessingFn,
context
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,31 @@ private Sequence<Row> mergeGroupByResults(
finalizingResults = subqueryResult;
}

return groupByStrategy.processSubqueryResult(subquery, query, resource, finalizingResults);
if (query.getSubtotalsSpec() != null) {
return groupByStrategy.processSubtotalsSpec(
query,
resource,
groupByStrategy.processSubqueryResult(subquery, query, resource, finalizingResults)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be query.withSubtotalsSpec(null)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, its needed in the impl of processSubtotalsSpec(..)

);
} else {
return groupByStrategy.applyPostProcessing(groupByStrategy.processSubqueryResult(
subquery,
query,
resource,
finalizingResults
), query);
}

} else {
return groupByStrategy.mergeResults(runner, query, context);
if (query.getSubtotalsSpec() != null) {
return groupByStrategy.processSubtotalsSpec(
query,
resource,
groupByStrategy.mergeResults(runner, query.withSubtotalsSpec(null), context)
);
} else {
return groupByStrategy.applyPostProcessing(groupByStrategy.mergeResults(runner, query, context), query);
}
}
}

Expand Down
Loading