Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 44 additions & 99 deletions sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.post.ExpressionPostAggregator;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.groupby.GroupByQuery;
Expand All @@ -61,7 +60,6 @@
import org.apache.druid.query.ordering.StringComparator;
import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.select.PagingSpec;
import org.apache.druid.query.select.SelectQuery;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.topn.DimensionTopNMetricSpec;
Expand Down Expand Up @@ -93,7 +91,6 @@
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;

/**
* A fully formed Druid query, built from a {@link PartialDruidQuery}. The work to develop this query is done
Expand Down Expand Up @@ -145,7 +142,13 @@ public DruidQuery(
// Now the fun begins.
this.filter = computeWhereFilter(partialQuery, plannerContext, sourceQuerySignature);
this.selectProjection = computeSelectProjection(partialQuery, plannerContext, sourceQuerySignature);
this.grouping = computeGrouping(partialQuery, plannerContext, sourceQuerySignature, rexBuilder, finalizeAggregations);
this.grouping = computeGrouping(
partialQuery,
plannerContext,
sourceQuerySignature,
rexBuilder,
finalizeAggregations
);

final RowSignature sortingInputRowSignature;
if (this.selectProjection != null) {
Expand Down Expand Up @@ -436,9 +439,9 @@ private static ProjectRowOrderAndPostAggregations computePostAggregations(
/**
* Returns dimensions corresponding to {@code aggregate.getGroupSet()}, in the same order.
*
* @param partialQuery partial query
* @param plannerContext planner context
* @param querySignature source row signature and re-usable virtual column references
* @param partialQuery partial query
* @param plannerContext planner context
* @param querySignature source row signature and re-usable virtual column references
*
* @return dimensions
*
Expand All @@ -452,12 +455,19 @@ private static List<DimensionExpression> computeDimensions(
{
final Aggregate aggregate = Preconditions.checkNotNull(partialQuery.getAggregate());
final List<DimensionExpression> dimensions = new ArrayList<>();
final String outputNamePrefix = Calcites.findUnusedPrefix("d", new TreeSet<>(querySignature.getRowSignature().getRowOrder()));
final String outputNamePrefix = Calcites.findUnusedPrefix(
"d",
new TreeSet<>(querySignature.getRowSignature().getRowOrder())
);
int outputNameCounter = 0;

for (int i : aggregate.getGroupSet()) {
// Dimension might need to create virtual columns. Avoid giving it a name that would lead to colliding columns.
final RexNode rexNode = Expressions.fromFieldAccess(querySignature.getRowSignature(), partialQuery.getSelectProject(), i);
final RexNode rexNode = Expressions.fromFieldAccess(
querySignature.getRowSignature(),
partialQuery.getSelectProject(),
i
);
final DruidExpression druidExpression = Expressions.toDruidExpression(
plannerContext,
querySignature.getRowSignature(),
Expand All @@ -478,7 +488,11 @@ private static List<DimensionExpression> computeDimensions(

final String dimOutputName;
if (!druidExpression.isSimpleExtraction()) {
virtualColumn = querySignature.getOrCreateVirtualColumnForExpression(plannerContext, druidExpression, sqlTypeName);
virtualColumn = querySignature.getOrCreateVirtualColumnForExpression(
plannerContext,
druidExpression,
sqlTypeName
);
dimOutputName = virtualColumn.getOutputName();
} else {
dimOutputName = outputNamePrefix + outputNameCounter++;
Expand Down Expand Up @@ -515,7 +529,10 @@ private static List<Aggregation> computeAggregations(
{
final Aggregate aggregate = Preconditions.checkNotNull(partialQuery.getAggregate());
final List<Aggregation> aggregations = new ArrayList<>();
final String outputNamePrefix = Calcites.findUnusedPrefix("a", new TreeSet<>(querySignature.getRowSignature().getRowOrder()));
final String outputNamePrefix = Calcites.findUnusedPrefix(
"a",
new TreeSet<>(querySignature.getRowSignature().getRowOrder())
);

for (int i = 0; i < aggregate.getAggCallList().size(); i++) {
final String aggName = outputNamePrefix + i;
Expand Down Expand Up @@ -745,11 +762,6 @@ private Query computeQuery()
return scanQuery;
}

final SelectQuery selectQuery = toSelectQuery();
if (selectQuery != null) {
return selectQuery;
}

throw new CannotBuildQueryException("Cannot convert query parts into an actual query");
}

Expand Down Expand Up @@ -966,9 +978,12 @@ public ScanQuery toScanQuery()
// Scan cannot GROUP BY.
return null;
}

if (limitSpec != null && limitSpec.getColumns().size() > 0) {
// Scan cannot ORDER BY.
if (limitSpec != null &&
(limitSpec.getColumns().size() > 1
|| (limitSpec.getColumns().size() == 1 && !Iterables.getOnlyElement(limitSpec.getColumns())
.getDimension()
.equals(ColumnHolder.TIME_COLUMN_NAME)))) {
// Scan cannot ORDER BY non-time columns.
return null;
}

Expand All @@ -984,97 +999,27 @@ public ScanQuery toScanQuery()
? 0L
: (long) limitSpec.getLimit();

ScanQuery.Order order;
if (limitSpec == null || limitSpec.getColumns().size() == 0) {
order = ScanQuery.Order.NONE;
} else if (limitSpec.getColumns().get(0).getDirection() == OrderByColumnSpec.Direction.ASCENDING) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bit of a tangent, but why again did we add a new ScanQuery.Order instead of re-using OrderByColumnSpec stuff?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we decided on adding a ScanQuery.Order enum because OrderByColumnSpec didn't really support a no-ordering case. See #7133 (comment)

order = ScanQuery.Order.ASCENDING;
} else {
order = ScanQuery.Order.DESCENDING;
}

return new ScanQuery(
dataSource,
filtration.getQuerySegmentSpec(),
selectProjection != null ? VirtualColumns.create(selectProjection.getVirtualColumns()) : VirtualColumns.EMPTY,
ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST,
0,
scanLimit,
null, // Will default to "none"
order, // Will default to "none"
filtration.getDimFilter(),
Ordering.natural().sortedCopy(ImmutableSet.copyOf(outputRowSignature.getRowOrder())),
false,
ImmutableSortedMap.copyOf(plannerContext.getQueryContext())
);
}

/**
* Return this query as a Select query, or null if this query is not compatible with Select.
*
* @return query or null
*/
@Nullable
public SelectQuery toSelectQuery()
{
if (grouping != null) {
return null;
}

final Filtration filtration = Filtration.create(filter).optimize(sourceQuerySignature);
final boolean descending;
final int threshold;

if (limitSpec != null) {
// Safe to assume limitSpec has zero or one entry; DruidSelectSortRule wouldn't push in anything else.
if (limitSpec.getColumns().size() == 0) {
descending = false;
} else if (limitSpec.getColumns().size() == 1) {
final OrderByColumnSpec orderBy = Iterables.getOnlyElement(limitSpec.getColumns());
if (!orderBy.getDimension().equals(ColumnHolder.TIME_COLUMN_NAME)) {
// Select cannot handle sorting on anything other than __time.
return null;
}
descending = orderBy.getDirection() == OrderByColumnSpec.Direction.DESCENDING;
} else {
// Select cannot handle sorting on more than one column.
return null;
}

threshold = limitSpec.getLimit();
} else {
descending = false;
threshold = 0;
}

// We need to ask for dummy columns to prevent Select from returning all of them.
String dummyColumn = "dummy";
while (sourceQuerySignature.getRowSignature().getColumnType(dummyColumn) != null
|| outputRowSignature.getRowOrder().contains(dummyColumn)) {
dummyColumn = dummyColumn + "_";
}

final List<String> metrics = new ArrayList<>();

if (selectProjection != null) {
metrics.addAll(selectProjection.getDirectColumns());
metrics.addAll(selectProjection.getVirtualColumns()
.stream()
.map(VirtualColumn::getOutputName)
.collect(Collectors.toList()));
} else {
// No projection, rowOrder should reference direct columns.
metrics.addAll(outputRowSignature.getRowOrder());
}

if (metrics.isEmpty()) {
metrics.add(dummyColumn);
}

// Not used for actual queries (will be replaced by QueryMaker) but the threshold is important for the planner.
final PagingSpec pagingSpec = new PagingSpec(null, threshold);

return new SelectQuery(
dataSource,
filtration.getQuerySegmentSpec(),
descending,
filtration.getDimFilter(),
Granularities.ALL,
ImmutableList.of(new DefaultDimensionSpec(dummyColumn, dummyColumn)),
metrics.stream().sorted().distinct().collect(Collectors.toList()),
getVirtualColumns(true),
pagingSpec,
ImmutableSortedMap.copyOf(plannerContext.getQueryContext())
);
}
}
Loading