Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.doris.common.io.Text;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.planner.normalize.Normalizer;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.rewrite.mvrewrite.MVExprEquivalent;
import org.apache.doris.statistics.ExprStats;
Expand Down Expand Up @@ -996,8 +997,12 @@ public void setFn(Function fn) {
this.fn = fn;
}

// Append a flattened version of this expr, including all children, to 'container'.
protected void treeToThriftHelper(TExpr container) {
treeToThriftHelper(container, ((expr, exprNode) -> expr.toThrift(exprNode)));
}

// Append a flattened version of this expr, including all children, to 'container'.
protected void treeToThriftHelper(TExpr container, ExprVisitor visitor) {
TExprNode msg = new TExprNode();
msg.type = type.toThrift();
msg.num_children = children.size();
Expand All @@ -1009,13 +1014,17 @@ protected void treeToThriftHelper(TExpr container) {
}
msg.output_scale = getOutputScale();
msg.setIsNullable(nullableFromNereids.isPresent() ? nullableFromNereids.get() : isNullable());
toThrift(msg);
visitor.visit(this, msg);
container.addToNodes(msg);
for (Expr child : children) {
child.treeToThriftHelper(container);
child.treeToThriftHelper(container, visitor);
}
}

public interface ExprVisitor {
void visit(Expr expr, TExprNode exprNode);
}

public static Type getAssignmentCompatibleType(List<Expr> children) {
Type assignmentCompatibleType = Type.INVALID;
for (int i = 0; i < children.size()
Expand Down Expand Up @@ -2193,6 +2202,16 @@ public String getStringValueForArray(FormatOptions options) {
return null;
}

public final TExpr normalize(Normalizer normalizer) {
TExpr result = new TExpr();
treeToThriftHelper(result, (expr, texprNode) -> expr.normalize(texprNode, normalizer));
return result;
}

protected void normalize(TExprNode msg, Normalizer normalizer) {
this.toThrift(msg);
}

public static Expr getFirstBoundChild(Expr expr, List<TupleId> tids) {
for (Expr child : expr.getChildren()) {
if (child.isBoundByTupleIds(tids)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.planner.normalize.Normalizer;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.thrift.TExprNode;
Expand Down Expand Up @@ -2406,6 +2407,17 @@ public void readFields(DataInput in) throws IOException {
isMergeAggFn = in.readBoolean();
}

@Override
protected void normalize(TExprNode msg, Normalizer normalizer) {
String functionName = fnName.getFunction().toUpperCase();
if (FunctionSet.nonDeterministicFunctions.contains(functionName)
|| "NOW".equals(functionName)
|| (FunctionSet.nonDeterministicTimeFunctions.contains(functionName) && children.isEmpty())) {
throw new IllegalStateException("Can not normalize non deterministic functions");
}
super.normalize(msg, normalizer);
}

public static FunctionCallExpr read(DataInput in) throws IOException {
FunctionCallExpr func = new FunctionCallExpr();
func.readFields(in);
Expand Down
12 changes: 12 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.ToSqlContext;
import org.apache.doris.planner.normalize.Normalizer;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TExprNode;
import org.apache.doris.thrift.TExprNodeType;
Expand Down Expand Up @@ -353,6 +354,17 @@ protected void toThrift(TExprNode msg) {
msg.setLabel(label);
}

@Override
protected void normalize(TExprNode msg, Normalizer normalizer) {
msg.node_type = TExprNodeType.SLOT_REF;
// we should eliminate the different tuple id to reuse query cache
msg.slot_ref = new TSlotRef(
normalizer.normalizeSlotId(desc.getId().asInt()),
0
);
msg.slot_ref.setColUniqueId(desc.getUniqueId());
}

@Override
public void markAgg() {
desc.setIsAgg(true);
Expand Down
26 changes: 26 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,32 @@ public boolean isNullResultWithOneNullParamFunctions(String funcName) {
.put(Type.DECIMAL128, Type.DECIMAL128)
.build();

public static final Set<String> nonDeterministicFunctions =
ImmutableSet.<String>builder()
.add("RAND")
.add("RANDOM")
.add("RANDOM_BYTES")
.add("CONNECTION_ID")
.add("DATABASE")
.add("USER")
.add("UUID")
.add("CURRENT_USER")
.add("UUID_NUMERIC")
.build();

public static final Set<String> nonDeterministicTimeFunctions =
ImmutableSet.<String>builder()
.add("NOW")
.add("CURDATE")
.add("CURRENT_DATE")
.add("UTC_TIMESTAMP")
.add("CURTIME")
.add("CURRENT_TIMESTAMP")
.add("CURRENT_TIME")
.add("UNIX_TIMESTAMP")
.add()
.build();

private static final Map<Type, String> STDDEV_UPDATE_SYMBOL =
ImmutableMap.<Type, String>builder()
.put(Type.TINYINT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.time.LocalDateTime;
import java.util.List;
import java.util.stream.Collectors;
import java.util.zip.CRC32;
Expand Down Expand Up @@ -304,6 +306,64 @@ public int compareTo(PartitionKey other) {
return Integer.compare(thisKeyLen, otherKeyLen);
}

public PartitionKey successor() throws AnalysisException {
Preconditions.checkState(
keys.size() == 1,
"Only support compute successor for one partition column"
);
LiteralExpr literal = keys.get(0);
PrimitiveType type = types.get(0);

PartitionKey successor = new PartitionKey();

switch (type) {
case TINYINT:
case SMALLINT:
case INT:
case BIGINT:
long maxValueOfType = (1L << ((type.getSlotSize() << 3 /* multiply 8 bit */) - 1)) - 1L;
long successorInt = ((IntLiteral) literal).getValue();
successorInt += successorInt < maxValueOfType ? 1 : 0;
successor.pushColumn(new IntLiteral(successorInt, Type.fromPrimitiveType(type)), type);
return successor;
case LARGEINT:
BigInteger maxValue = BigInteger.ONE.shiftLeft(127).subtract(BigInteger.ONE);
BigInteger successorLargeInt = (BigInteger) literal.getRealValue();
successorLargeInt = successorLargeInt.add(
successorLargeInt.compareTo(maxValue) < 0 ? BigInteger.ONE : BigInteger.ZERO
);
successor.pushColumn(new LargeIntLiteral(successorLargeInt), type);
return successor;
case DATE:
case DATEV2:
case DATETIME:
case DATETIMEV2:
DateLiteral dateLiteral = (DateLiteral) literal;
LocalDateTime successorDateTime = LocalDateTime.of(
(int) dateLiteral.getYear(),
(int) dateLiteral.getMonth(),
(int) dateLiteral.getDay(),
(int) dateLiteral.getHour(),
(int) dateLiteral.getMinute(),
(int) dateLiteral.getSecond(),
(int) dateLiteral.getMicrosecond() * 1000
);
if (type == PrimitiveType.DATE || type == PrimitiveType.DATEV2) {
successorDateTime = successorDateTime.plusDays(1);
} else if (type == PrimitiveType.DATETIME) {
successorDateTime = successorDateTime.plusSeconds(1);
} else {
int scale = Math.min(6, Math.max(0, ((ScalarType) literal.getType()).getScalarScale()));
long nanoSeconds = BigInteger.TEN.pow(9 - scale).longValue();
successorDateTime = successorDateTime.plusNanos(nanoSeconds);
}
successor.pushColumn(new DateLiteral(successorDateTime, literal.getType()), type);
return successor;
default:
throw new AnalysisException("Unsupported type: " + type);
}
}

// return: ("100", "200", "300")
public String toSql() {
StringBuilder sb = new StringBuilder("(");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.io.DataInput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -66,6 +67,21 @@ public RangePartitionInfo(boolean isAutoCreatePartitions, ArrayList<Expr> exprs,
}
}

public Map<Long, PartitionItem> getPartitionItems(Collection<Long> partitionIds) {
Map<Long, PartitionItem> columnRanges = Maps.newLinkedHashMapWithExpectedSize(partitionIds.size());
for (Long partitionId : partitionIds) {
PartitionItem partitionItem = idToItem.get(partitionId);
if (partitionItem == null) {
partitionItem = idToTempItem.get(partitionId);
}
if (partitionItem == null) {
throw new IllegalStateException("Can not found partition item: " + partitionId);
}
columnRanges.put(partitionId, partitionItem);
}
return columnRanges;
}

@Override
public PartitionItem createAndCheckPartitionItem(SinglePartitionDesc desc, boolean isTemp) throws DdlException {
Range<PartitionKey> newRange = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.mysql.FieldInfo;
import org.apache.doris.nereids.CascadesContext.Lock;
import org.apache.doris.nereids.exceptions.AnalysisException;
Expand Down Expand Up @@ -65,16 +66,20 @@
import org.apache.doris.planner.Planner;
import org.apache.doris.planner.RuntimeFilter;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.planner.normalize.QueryCacheNormalizer;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ResultSet;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.thrift.TQueryCacheParam;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.apache.commons.codec.binary.Hex;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -343,10 +348,11 @@ private void splitFragments(PhysicalPlan resultPlan) {
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsTranslateTime();
}
if (cascadesContext.getConnectContext().getSessionVariable().isEnableNereidsTrace()) {
SessionVariable sessionVariable = cascadesContext.getConnectContext().getSessionVariable();
if (sessionVariable.isEnableNereidsTrace()) {
CounterEvent.clearCounter();
}
if (cascadesContext.getConnectContext().getSessionVariable().isPlayNereidsDump()) {
if (sessionVariable.isPlayNereidsDump()) {
return;
}
PlanFragment root = physicalPlanTranslator.translatePlan(physicalPlan);
Expand All @@ -355,8 +361,31 @@ private void splitFragments(PhysicalPlan resultPlan) {
physicalRelations.addAll(planTranslatorContext.getPhysicalRelations());
descTable = planTranslatorContext.getDescTable();
fragments = new ArrayList<>(planTranslatorContext.getPlanFragments());

boolean enableQueryCache = sessionVariable.getEnableQueryCache();
String queryId = DebugUtil.printId(cascadesContext.getConnectContext().queryId());
for (int seq = 0; seq < fragments.size(); seq++) {
fragments.get(seq).setFragmentSequenceNum(seq);
PlanFragment fragment = fragments.get(seq);
fragment.setFragmentSequenceNum(seq);
if (enableQueryCache) {
try {
QueryCacheNormalizer normalizer = new QueryCacheNormalizer(fragment, descTable);
Optional<TQueryCacheParam> queryCacheParam =
normalizer.normalize(cascadesContext.getConnectContext());
if (queryCacheParam.isPresent()) {
fragment.queryCacheParam = queryCacheParam.get();
// after commons-codec 1.14 (include), Hex.encodeHexString will change ByteBuffer.pos,
// so we should copy a new byte buffer to print it
ByteBuffer digestCopy = fragment.queryCacheParam.digest.duplicate();
LOG.info("Use query cache for fragment {}, node id: {}, digest: {}, queryId: {}",
seq,
fragment.queryCacheParam.node_id,
Hex.encodeHexString(digestCopy), queryId);
}
} catch (Throwable t) {
// do nothing
}
}
}
// set output exprs
logicalPlanAdapter.setResultExprs(root.getOutputExprs());
Expand Down
65 changes: 65 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/nereids/util/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,71 @@ private static <T> List<List<T>> doAllCombinations(List<List<T>> lists) {
).collect(ImmutableList.toImmutableList());
}

/** getAllCombinations */
public static <T> List<List<T>> getAllCombinations(List<T> list, int itemNum) {
List<List<T>> result = Lists.newArrayList();
generateCombinations(list, itemNum, 0, Lists.newArrayList(), result);
return result;
}

private static <T> void generateCombinations(
List<T> list, int n, int start, List<T> current, List<List<T>> result) {
if (current.size() == n) {
result.add(new ArrayList<>(current));
return;
}

for (int i = start; i < list.size(); i++) {
current.add(list.get(i));
generateCombinations(list, n, i + 1, current, result);
current.remove(current.size() - 1);
}
}

public static <T> List<List<T>> allPermutations(List<T> list) {
List<List<T>> result = new ArrayList<>();
generatePermutations(new ArrayList<>(list), new ArrayList<>(), result);
return result;
}

private static <T> void generatePermutations(List<T> list, List<T> current, List<List<T>> result) {
if (!current.isEmpty()) {
result.add(new ArrayList<>(current));
}

for (int i = 0; i < list.size(); i++) {
T element = list.remove(i);
current.add(element);
generatePermutations(list, current, result);
current.remove(current.size() - 1);
list.add(i, element);
}
}

/** permutations */
public static <T> List<List<T>> permutations(List<T> list) {
list = new ArrayList<>(list);
List<List<T>> result = new ArrayList<>();
if (list.isEmpty()) {
result.add(new ArrayList<>());
return result;
}

T firstElement = list.get(0);
List<T> rest = list.subList(1, list.size());
List<List<T>> recursivePermutations = permutations(rest);

for (List<T> smallerPermutated : recursivePermutations) {
for (int index = 0; index <= smallerPermutated.size(); index++) {
List<T> temp = new ArrayList<>(smallerPermutated);
temp.add(index, firstElement);
result.add(temp);
}
}

return result;
}

public static <T> List<T> copyRequiredList(List<T> list) {
return ImmutableList.copyOf(Objects.requireNonNull(list, "non-null list is required"));
}
Expand Down
Loading