Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions processing/src/main/java/io/druid/segment/VirtualColumns.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.druid.java.util.common.Cacheable;
Expand Down Expand Up @@ -70,6 +71,11 @@ public static Pair<String, String> splitColumnName(String columnName)
}
}

public static VirtualColumns create(VirtualColumn...virtualColumns)
{
return create(Lists.newArrayList(virtualColumns));
}

@JsonCreator
public static VirtualColumns create(List<VirtualColumn> virtualColumns)
{
Expand Down
15 changes: 13 additions & 2 deletions sql/src/main/java/io/druid/sql/calcite/rel/DruidQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,12 @@ public TimeseriesQuery toTimeseriesQuery()
}

final Filtration filtration = Filtration.create(filter).optimize(sourceRowSignature);

final List<PostAggregator> postAggregators = new ArrayList<>(grouping.getPostAggregators());
if (sortProject != null) {
postAggregators.addAll(sortProject.getPostAggregators());
}

final Map<String, Object> theContext = Maps.newHashMap();
theContext.put("skipEmptyBuckets", true);
theContext.putAll(plannerContext.getQueryContext());
Expand All @@ -790,7 +796,7 @@ public TimeseriesQuery toTimeseriesQuery()
filtration.getDimFilter(),
queryGranularity,
grouping.getAggregatorFactories(),
grouping.getPostAggregators(),
postAggregators,
ImmutableSortedMap.copyOf(theContext)
);
}
Expand Down Expand Up @@ -849,6 +855,11 @@ public TopNQuery toTopNQuery()

final Filtration filtration = Filtration.create(filter).optimize(sourceRowSignature);

final List<PostAggregator> postAggregators = new ArrayList<>(grouping.getPostAggregators());
if (sortProject != null) {
postAggregators.addAll(sortProject.getPostAggregators());
}

return new TopNQuery(
dataSource,
getVirtualColumns(plannerContext.getExprMacroTable(), true),
Expand All @@ -859,7 +870,7 @@ public TopNQuery toTopNQuery()
filtration.getDimFilter(),
Granularities.ALL,
grouping.getAggregatorFactories(),
grouping.getPostAggregators(),
postAggregators,
ImmutableSortedMap.copyOf(plannerContext.getQueryContext())
);
}
Expand Down
99 changes: 99 additions & 0 deletions sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import io.druid.query.topn.InvertedTopNMetricSpec;
import io.druid.query.topn.NumericTopNMetricSpec;
import io.druid.query.topn.TopNQueryBuilder;
import io.druid.segment.VirtualColumns;
import io.druid.segment.column.Column;
import io.druid.segment.column.ValueType;
import io.druid.segment.virtual.ExpressionVirtualColumn;
Expand Down Expand Up @@ -6721,6 +6722,104 @@ public void testSortProjectAfterNestedGroupBy() throws Exception
);
}

@Test
public void testPostAggWithTimeseries() throws Exception
{
testQuery(
"SELECT "
+ " FLOOR(__time TO YEAR), "
+ " SUM(m1), "
+ " SUM(m1) + SUM(m2) "
+ "FROM "
+ " druid.foo "
+ "WHERE "
+ " dim2 = 'a' "
+ "GROUP BY FLOOR(__time TO YEAR) "
+ "ORDER BY FLOOR(__time TO YEAR) desc",
Collections.singletonList(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(QSS(Filtration.eternity()))
.filters(SELECTOR("dim2", "a", null))
.granularity(Granularities.YEAR)
.aggregators(
AGGS(
new DoubleSumAggregatorFactory("a0", "m1"),
new DoubleSumAggregatorFactory("a1", "m2")
)
)
.postAggregators(
EXPRESSION_POST_AGG("p0", "(\"a0\" + \"a1\")")
)
.descending(true)
.context(TIMESERIES_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{978307200000L, 4.0, 8.0},
new Object[]{946684800000L, 1.0, 2.0}
)
);
}

@Test
public void testPostAggWithTopN() throws Exception
{
testQuery(
"SELECT "
+ " FLOOR(__time TO SECOND), "
+ " AVG(m2), "
+ " SUM(m1) + SUM(m2) "
+ "FROM "
+ " druid.foo "
+ "WHERE "
+ " dim2 = 'a' "
+ "GROUP BY FLOOR(__time TO SECOND) "
+ "ORDER BY FLOOR(__time TO SECOND) "
+ "LIMIT 5",
Collections.singletonList(
new TopNQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(QSS(Filtration.eternity()))
.granularity(Granularities.ALL)
.dimension(new DefaultDimensionSpec("d0:v", "d0", ValueType.LONG))
.virtualColumns(
VirtualColumns.create(
EXPRESSION_VIRTUAL_COLUMN("d0:v", "timestamp_floor(\"__time\",'PT1S','','UTC')", ValueType.LONG)
)
)
.filters("dim2", "a")
.aggregators(AGGS(
new DoubleSumAggregatorFactory("a0:sum", "m2"),
new CountAggregatorFactory("a0:count"),
new DoubleSumAggregatorFactory("a1", "m1"),
new DoubleSumAggregatorFactory("a2", "m2")
))
.postAggregators(
ImmutableList.of(
new ArithmeticPostAggregator(
"a0",
"quotient",
ImmutableList.of(
new FieldAccessPostAggregator(null, "a0:sum"),
new FieldAccessPostAggregator(null, "a0:count")
)
),
EXPRESSION_POST_AGG("p0", "(\"a1\" + \"a2\")")
)
)
.metric(new DimensionTopNMetricSpec(null, StringComparators.NUMERIC))
.threshold(5)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{946684800000L, 1.0, 2.0},
new Object[]{978307200000L, 4.0, 8.0}
)
);
}

private void testQuery(
final String sql,
final List<Query> expectedQueries,
Expand Down