Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -199,35 +199,42 @@ public boolean apply(AggregatorFactory agg)
final GroupByQuery outerQuery = new GroupByQuery.Builder(query)
.setLimitSpec(query.getLimitSpec().merge(subquery.getLimitSpec()))
.build();
final IncrementalIndex index = makeIncrementalIndex(innerQuery, subqueryResult);

final IncrementalIndex innerQueryResultIndex = makeIncrementalIndex(innerQuery, subqueryResult);

//Outer query might have multiple intervals, but they are expected to be non-overlapping and sorted which
//is ensured by QuerySegmentSpec.
//GroupByQueryEngine can only process one interval at a time, so we need to call it once per interval
//and concatenate the results.
return new ResourceClosingSequence<>(
outerQuery.applyLimit(
Sequences.concat(
Sequences.map(
Sequences.simple(outerQuery.getIntervals()),
new Function<Interval, Sequence<Row>>()
{
@Override
public Sequence<Row> apply(Interval interval)
{
return engine.process(
outerQuery.withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(ImmutableList.of(interval))
),
new IncrementalIndexStorageAdapter(index)
);
}
}
)
final IncrementalIndex outerQueryResultIndex = makeIncrementalIndex(
outerQuery,
Sequences.concat(
Sequences.map(
Sequences.simple(outerQuery.getIntervals()),
new Function<Interval, Sequence<Row>>()
{
@Override
public Sequence<Row> apply(Interval interval)
{
return engine.process(
outerQuery.withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(ImmutableList.of(interval))
),
new IncrementalIndexStorageAdapter(innerQueryResultIndex)
);
}
}
)
),
index
)
);

innerQueryResultIndex.close();

return new ResourceClosingSequence<>(
outerQuery.applyLimit(postAggregate(query, outerQueryResultIndex)),
outerQueryResultIndex
);

} else {
final IncrementalIndex index = makeIncrementalIndex(
query, runner.run(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2809,6 +2809,63 @@ public void testSubqueryWithMultipleIntervalsInOuterQuery()
TestHelper.assertExpectedObjects(expectedResults, results, "");
}

@Test
public void testSubqueryWithExtractionFnInOuterQuery()
{
//https://github.com/druid-io/druid/issues/2556

GroupByQuery subquery = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setDimFilter(new JavaScriptDimFilter("quality", "function(dim){ return true; }"))
.setAggregatorSpecs(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("idx", "index")
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();

GroupByQuery query = GroupByQuery
.builder()
.setDataSource(subquery)
.setQuerySegmentSpec(
new MultipleIntervalSegmentSpec(
ImmutableList.of(
new Interval("2011-04-01T00:00:00.000Z/2011-04-03T00:00:00.000Z")
)
)
)
.setDimensions(Lists.<DimensionSpec>newArrayList(
new ExtractionDimensionSpec(
"alias",
"alias",
new RegexDimExtractionFn("(a).*", true, "a")
)
)
)
.setAggregatorSpecs(
Arrays.<AggregatorFactory>asList(
new LongSumAggregatorFactory("rows", "rows"),
new LongSumAggregatorFactory("idx", "idx")
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();

List<Row> expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "a", "rows", 13L, "idx", 6619L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "a", "rows", 13L, "idx", 5827L)
);

// Subqueries are handled by the ToolChest
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}

@Test
public void testDifferentGroupingSubquery()
{
Expand Down