Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.io.BaseEncoding;
import com.google.common.primitives.Chars;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexLiteral;
Expand Down Expand Up @@ -378,4 +379,37 @@ public static String makePrefixedName(final String prefix, final String suffix)
{
return StringUtils.format("%s:%s", prefix, suffix);
}

public static int getInt(RexNode rex, int defaultValue)
{
return rex == null ? defaultValue : RexLiteral.intValue(rex);
}

public static int getOffset(Sort sort)
{
return Calcites.getInt(sort.offset, 0);
}

public static int getFetch(Sort sort)
{
return Calcites.getInt(sort.fetch, -1);
}

public static int collapseFetch(int innerFetch, int outerFetch, int outerOffset)
{
final int fetch;
if (innerFetch < 0 && outerFetch < 0) {
// Neither has a limit => no limit overall.
fetch = -1;
} else if (innerFetch < 0) {
// Outer limit only.
fetch = outerFetch;
} else if (outerFetch < 0) {
// Inner limit only.
fetch = Math.max(0, innerFetch - outerOffset);
} else {
fetch = Math.max(0, Math.min(innerFetch - outerOffset, outerFetch));
}
return fetch;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@

package org.apache.druid.sql.calcite.planner;

import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.primitives.Ints;
import org.apache.calcite.DataContext;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.interpreter.BindableConvention;
Expand All @@ -35,6 +35,7 @@
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.logical.LogicalSort;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexBuilder;
Expand Down Expand Up @@ -139,31 +140,21 @@ private PlannerResult planWithDruidConvention(
if (explain != null) {
return planExplanation(druidRel, explain, dataSourceNames);
} else {
final Supplier<Sequence<Object[]>> resultsSupplier = new Supplier<Sequence<Object[]>>()
{
@Override
public Sequence<Object[]> get()
{
if (root.isRefTrivial()) {
return druidRel.runQuery();
} else {
// Add a mapping on top to accommodate root.fields.
return Sequences.map(
druidRel.runQuery(),
new Function<Object[], Object[]>()
{
@Override
public Object[] apply(final Object[] input)
{
final Object[] retVal = new Object[root.fields.size()];
for (int i = 0; i < root.fields.size(); i++) {
retVal[i] = input[root.fields.get(i).getKey()];
}
return retVal;
}
final Supplier<Sequence<Object[]>> resultsSupplier = () -> {
if (root.isRefTrivial()) {
return druidRel.runQuery();
} else {
// Add a mapping on top to accommodate root.fields.
return Sequences.map(
druidRel.runQuery(),
input -> {
final Object[] retVal = new Object[root.fields.size()];
for (int i = 0; i < root.fields.size(); i++) {
retVal[i] = input[root.fields.get(i).getKey()];
}
);
}
return retVal;
}
);
}
};

Expand Down Expand Up @@ -212,9 +203,9 @@ private PlannerResult planWithBindableConvention(
new BaseSequence.IteratorMaker<Object[], EnumeratorIterator<Object[]>>()
{
@Override
public EnumeratorIterator make()
public EnumeratorIterator<Object[]> make()
{
return new EnumeratorIterator(new Iterator<Object[]>()
return new EnumeratorIterator<>(new Iterator<Object[]>()
{
@Override
public boolean hasNext()
Expand All @@ -236,16 +227,19 @@ public void cleanup(EnumeratorIterator iterFromMake)

}
}
), () -> enumerator.close());
), enumerator::close);
};
return new PlannerResult(resultsSupplier, root.validatedRowType, ImmutableSet.of());
}
}

/**
* This method wraps the root with a logical sort that applies a limit (no ordering change).
* The CTX_SQL_OUTER_LIMIT flag that controls this wrapping is meant for internal use only by the
* web console, allowing it to apply a limit to queries without rewriting the original SQL.
* This method wraps the root with a {@link LogicalSort} that applies a limit (no ordering change). If the outer rel
* is already a {@link Sort}, we can merge our outerLimit into it, similar to what is going on in
* {@link org.apache.druid.sql.calcite.rule.SortCollapseRule}.
*
* The {@link PlannerContext#CTX_SQL_OUTER_LIMIT} flag that controls this wrapping is meant for internal use only by
* the web console, allowing it to apply a limit to queries without rewriting the original SQL.
*
* @param root root node
* @return root node wrapped with a limiting logical sort if a limit is specified in the query context.
Expand All @@ -261,6 +255,23 @@ private RelNode possiblyWrapRootWithOuterLimitFromContext(
return root.rel;
}

if (root.rel instanceof Sort) {
Sort innerSort = (Sort) root.rel;
final int offset = Calcites.getOffset(innerSort);
final int fetch = Calcites.collapseFetch(
Calcites.getFetch(innerSort),
Ints.checkedCast(outerLimit),
0
);

return LogicalSort.create(
innerSort.getInput(),
innerSort.collation,
makeBigIntLiteral(offset),
makeBigIntLiteral(fetch)
);
}

return LogicalSort.create(
root.rel,
root.collation,
Expand All @@ -282,7 +293,7 @@ private static class EnumeratorIterator<T> implements Iterator<T>
{
private final Iterator<T> it;

public EnumeratorIterator(Iterator<T> it)
EnumeratorIterator(Iterator<T> it)
{
this.it = it;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rex.RexLiteral;
import org.apache.druid.sql.calcite.planner.Calcites;

/**
* Collapses two adjacent Sort operations together. Useful for queries like
Expand All @@ -45,48 +45,30 @@ public static SortCollapseRule instance()
@Override
public void onMatch(final RelOptRuleCall call)
{
// First is the inner sort, second is the outer sort.
final Sort first = call.rel(1);
final Sort second = call.rel(0);
final Sort outerSort = call.rel(0);
final Sort innerSort = call.rel(1);

if (second.collation.getFieldCollations().isEmpty()
|| second.collation.getFieldCollations().equals(first.collation.getFieldCollations())) {
// Add up the offsets.
final int firstOffset = (first.offset != null ? RexLiteral.intValue(first.offset) : 0);
final int secondOffset = (second.offset != null ? RexLiteral.intValue(second.offset) : 0);

final int offset = firstOffset + secondOffset;
final int fetch;
if (outerSort.collation.getFieldCollations().isEmpty()
|| outerSort.collation.getFieldCollations().equals(innerSort.collation.getFieldCollations())) {
final int innerOffset = Calcites.getOffset(innerSort);
final int innerFetch = Calcites.getFetch(innerSort);
final int outerOffset = Calcites.getOffset(outerSort);
final int outerFetch = Calcites.getFetch(outerSort);

if (first.fetch == null && second.fetch == null) {
// Neither has a limit => no limit overall.
fetch = -1;
} else if (first.fetch == null) {
// Outer limit only.
fetch = RexLiteral.intValue(second.fetch);
} else if (second.fetch == null) {
// Inner limit only.
fetch = Math.max(0, RexLiteral.intValue(first.fetch) - secondOffset);
} else {
fetch = Math.max(
0,
Math.min(
RexLiteral.intValue(first.fetch) - secondOffset,
RexLiteral.intValue(second.fetch)
)
);
}
// Add up the offsets.
final int offset = innerOffset + outerOffset;
final int fetch = Calcites.collapseFetch(innerFetch, outerFetch, outerOffset);

final Sort combined = first.copy(
first.getTraitSet(),
first.getInput(),
first.getCollation(),
final Sort combined = innerSort.copy(
innerSort.getTraitSet(),
innerSort.getInput(),
innerSort.getCollation(),
offset == 0 ? null : call.builder().literal(offset),
fetch < 0 ? null : call.builder().literal(fetch)
);

call.transformTo(combined);
call.getPlanner().setImportance(second, 0.0);
call.getPlanner().setImportance(outerSort, 0.0);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ public int getMaxSemiJoinRowsInMemory()
// Matches QUERY_CONTEXT_LOS_ANGELES
public static final Map<String, Object> TIMESERIES_CONTEXT_LOS_ANGELES = new HashMap<>();

public static final Map<String, Object> OUTER_LIMIT_CONTEXT = new HashMap<>(QUERY_CONTEXT_DEFAULT);

public static QueryRunnerFactoryConglomerate conglomerate;
public static Closer resourceCloser;

Expand All @@ -236,6 +238,8 @@ public int getMaxSemiJoinRowsInMemory()
TIMESERIES_CONTEXT_LOS_ANGELES.put("skipEmptyBuckets", true);
TIMESERIES_CONTEXT_LOS_ANGELES.put(QueryContexts.DEFAULT_TIMEOUT_KEY, QueryContexts.DEFAULT_TIMEOUT_MILLIS);
TIMESERIES_CONTEXT_LOS_ANGELES.put(QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE);

OUTER_LIMIT_CONTEXT.put(PlannerContext.CTX_SQL_OUTER_LIMIT, 2);
}

// Generate timestamps for expected results
Expand Down
Loading