Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/http/action/mini_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,8 @@ Status MiniLoadAction::check_auth(HttpRequest* http_req) {
return status;
}
client->loadCheck(res, req);
_user.assign(req.user);
_cluster.assign(cluster);
}
} catch (apache::thrift::TException& e) {
// failed when retry.
Expand Down
8 changes: 4 additions & 4 deletions fe/src/com/baidu/palo/analysis/AggregateInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,8 @@ private void createDistinctAggInfo(

public ArrayList<FunctionCallExpr> getMaterializedAggregateExprs() {
ArrayList<FunctionCallExpr> result = Lists.newArrayList();
for (Integer i : materializedAggregateSlots_) {
result.add(aggregateExprs_.get(i));
for (Integer i: materializedSlots_) {
result.add(aggregateExprs_.get(i));
}
return result;
}
Expand Down Expand Up @@ -725,8 +725,8 @@ public void materializeRequiredSlots(Analyzer analyzer, ExprSubstitutionMap smap
// over query statements, if aggregate functions contain count(*), now
// materialize all slots this SelectStmt maps.
// chenhao added.
if (hasCountStar) {
resolvedExprs = smap.getRhs();
if (hasCountStar && smap != null && smap.size() > 0) {
resolvedExprs.addAll(smap.getRhs());
}
analyzer.materializeSlots(resolvedExprs);

Expand Down
2 changes: 1 addition & 1 deletion fe/src/com/baidu/palo/analysis/AnalyticExpr.java
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ private void standardize(Analyzer analyzer) throws AnalysisException {
// -1 indicates that no NULL values are inserted even though we set the end
// bound to the start bound (which is PRECEDING) below; this is different from
// the default behavior of windows with an end bound PRECEDING.
paramExprs.add(new DecimalLiteral(BigDecimal.valueOf(-1)));
paramExprs.add(new IntLiteral(-1, Type.BIGINT));
}

window = new AnalyticWindow(window.getType(),
Expand Down
4 changes: 4 additions & 0 deletions fe/src/com/baidu/palo/analysis/LimitElement.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ public long getOffset() {
return offset;
}

public boolean hasOffset() {
return offset != 0;
}

public String toSql() {
if (limit == -1) {
return "";
Expand Down
68 changes: 35 additions & 33 deletions fe/src/com/baidu/palo/analysis/QueryStmt.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,20 +156,20 @@ public List<TupleId> getCorrelatedTupleIds(Analyzer analyzer)
collectTableRefs(tblRefs);
for (TableRef tblRef: tblRefs) {
if (absoluteRef == null && !tblRef.isRelative()) absoluteRef = tblRef;
if (tblRef.isCorrelated()) {
/*
// Check if the correlated table ref is rooted at a tuple descriptor from within
// this query stmt. If so, the correlation is contained within this stmt
// and the table ref does not conflict with absolute refs.
CollectionTableRef t = (CollectionTableRef) tblRef;
Preconditions.checkState(t.getResolvedPath().isRootedAtTuple());
// This check relies on tblRefs being in depth-first order.
if (!tblRefIds.contains(t.getResolvedPath().getRootDesc().getId())) {
if (correlatedRef == null) correlatedRef = tblRef;
correlatedTupleIds.add(t.getResolvedPath().getRootDesc().getId());
}
*/
}
/*if (tblRef.isCorrelated()) {
*
* // Check if the correlated table ref is rooted at a tuple descriptor from within
* // this query stmt. If so, the correlation is contained within this stmt
* // and the table ref does not conflict with absolute refs.
* CollectionTableRef t = (CollectionTableRef) tblRef;
* Preconditions.checkState(t.getResolvedPath().isRootedAtTuple());
* // This check relies on tblRefs being in depth-first order.
* if (!tblRefIds.contains(t.getResolvedPath().getRootDesc().getId())) {
* if (correlatedRef == null) correlatedRef = tblRef;
* correlatedTupleIds.add(t.getResolvedPath().getRootDesc().getId());
* }
*
}*/
if (correlatedRef != null && absoluteRef != null) {
throw new AnalysisException(String.format(
"Nested query is illegal because it contains a table reference '%s' " +
Expand Down Expand Up @@ -234,23 +234,25 @@ protected void createSortInfo(Analyzer analyzer) throws AnalysisException {
sortInfo = new SortInfo(orderingExprs, isAscOrder, nullsFirstParams);
// order by w/o limit and offset in inline views, union operands and insert statements
// are ignored.
if (!hasLimit() && !hasOffset() && !analyzer.isRootAnalyzer()) {
evaluateOrderBy = false;
// Return a warning that the order by was ignored.
StringBuilder strBuilder = new StringBuilder();
strBuilder.append("Ignoring ORDER BY clause without LIMIT or OFFSET: ");
strBuilder.append("ORDER BY ");
strBuilder.append(orderByElements.get(0).toSql());
for (int i = 1; i < orderByElements.size(); ++i) {
strBuilder.append(", ").append(orderByElements.get(i).toSql());
}
strBuilder.append(".\nAn ORDER BY appearing in a view, subquery, union operand, ");
strBuilder.append("or an insert/ctas statement has no effect on the query result ");
strBuilder.append("unless a LIMIT and/or OFFSET is used in conjunction ");
strBuilder.append("with the ORDER BY.");
} else {
evaluateOrderBy = true;
}
// TODO chenhao, open this when we don't limit rows subquery returns by SortNode.
/*if (!hasLimit() && !hasOffset() && !analyzer.isRootAnalyzer()) {
* evaluateOrderBy = false;
* // Return a warning that the order by was ignored.
* StringBuilder strBuilder = new StringBuilder();
* strBuilder.append("Ignoring ORDER BY clause without LIMIT or OFFSET: ");
* strBuilder.append("ORDER BY ");
* strBuilder.append(orderByElements.get(0).toSql());
* for (int i = 1; i < orderByElements.size(); ++i) {
* strBuilder.append(", ").append(orderByElements.get(i).toSql());
* }
* strBuilder.append(".\nAn ORDER BY appearing in a view, subquery, union operand, ");
* strBuilder.append("or an insert/ctas statement has no effect on the query result ");
* strBuilder.append("unless a LIMIT and/or OFFSET is used in conjunction ");
* strBuilder.append("with the ORDER BY.");
* } else {
*/
evaluateOrderBy = true;
//}
}

/**
Expand Down Expand Up @@ -400,8 +402,8 @@ public void removeOrderByElements() {
public boolean hasOrderByClause() {
return orderByElements != null;
}
public boolean hasLimit() { return limitElement != null; }
public boolean hasOffset() { return limitElement != null && limitElement.getOffset() != 0; }
public boolean hasLimit() { return limitElement != null && limitElement.hasLimit(); }
public boolean hasOffset() { return limitElement != null && limitElement.hasOffset(); }

public long getLimit() {
return limitElement.getLimit();
Expand Down
2 changes: 1 addition & 1 deletion fe/src/com/baidu/palo/analysis/SelectStmt.java
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ public void materializeRequiredSlots(Analyzer analyzer) throws AnalysisException
analyzer.getUnassignedConjuncts(getTableRefIds(), true);
List<Expr> unassignedJoinConjuncts = Lists.newArrayList();
for (Expr e: unassigned) {
if (analyzer.evalByJoin(e)) {
if (analyzer.evalAfterJoin(e)) {
unassignedJoinConjuncts.add(e);
}
}
Expand Down
5 changes: 2 additions & 3 deletions fe/src/com/baidu/palo/planner/AggregationNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,7 @@ protected void toThrift(TPlanNode msg) {

List<TExpr> aggregateFunctions = Lists.newArrayList();
// only serialize agg exprs that are being materialized
//for (FunctionCallExpr e: aggInfo.getMaterializedAggregateExprs()) {
for (FunctionCallExpr e : aggInfo.getAggregateExprs()) {
for (FunctionCallExpr e: aggInfo.getMaterializedAggregateExprs()) {
aggregateFunctions.add(e.treeToThrift());
}
msg.agg_node =
Expand All @@ -254,7 +253,7 @@ protected void toThrift(TPlanNode msg) {
@Override
protected String getNodeExplainString(String detailPrefix, TExplainLevel detailLevel) {
StringBuilder output = new StringBuilder();
if (aggInfo.getAggregateExprs() != null && aggInfo.getAggregateExprs().size() > 0) {
if (aggInfo.getAggregateExprs() != null && aggInfo.getMaterializedAggregateExprs().size() > 0) {
output.append(detailPrefix + "output: ").append(
getExplainString(aggInfo.getAggregateExprs()) + "\n");
}
Expand Down
3 changes: 2 additions & 1 deletion fe/src/com/baidu/palo/planner/Planner.java
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,9 @@ public void createPlanFragments(StatementBase statment, Analyzer analyzer, TQuer
}
}

// TODO chenhao16 , no used materialization work
// compute referenced slots before calling computeMemLayout()
analyzer.markRefdSlots(analyzer, singleNodePlan, resultExprs, null);
//analyzer.markRefdSlots(analyzer, singleNodePlan, resultExprs, null);

setResultExprScale(analyzer, queryStmt.getResultExprs());

Expand Down
3 changes: 3 additions & 0 deletions fe/src/com/baidu/palo/planner/SingleNodePlanner.java
Original file line number Diff line number Diff line change
Expand Up @@ -1140,6 +1140,9 @@ private PlanNode createScanNode(Analyzer analyzer, TableRef tblRef)
}
// assignConjuncts(scanNode, analyzer);
scanNode.init(analyzer);
// TODO chenhao16 add
// materialize conjuncts in where
analyzer.materializeSlots(scanNode.getConjuncts());
scanNodes.add(scanNode);
return scanNode;
}
Expand Down
7 changes: 7 additions & 0 deletions fe/src/com/baidu/palo/qe/ConnectProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,13 @@ public TMasterOpResult proxyExecute(TMasterOpRequest request) {
if (request.isSetResourceInfo()) {
ctx.getSessionVariable().setResourceGroup(request.getResourceInfo().getGroup());
}
if (request.isSetExecMemLimit()) {
ctx.getSessionVariable().setMaxExecMemByte(request.getExecMemLimit());
}
if (request.isSetQueryTimeout()) {
ctx.getSessionVariable().setQueryTimeoutS(request.getQueryTimeout());
}

ctx.setThreadLocalInfo();

StmtExecutor executor = null;
Expand Down
2 changes: 2 additions & 0 deletions fe/src/com/baidu/palo/qe/MasterOpExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ private void forward() throws Exception {
params.setUser(ctx.getUser());
params.setDb(ctx.getDatabase());
params.setResourceInfo(ctx.toResourceCtx());
params.setExecMemLimit(ctx.getSessionVariable().getMaxExecMemByte());
params.setQueryTimeout(ctx.getSessionVariable().getQueryTimeoutS());

LOG.info("Forward statement {} to Master {}", originStmt, thriftAddress);

Expand Down
2 changes: 2 additions & 0 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,8 @@ struct TMasterOpRequest {
3: required string sql
4: optional Types.TResourceInfo resourceInfo
5: optional string cluster
6: optional i64 execMemLimit
7: optional i32 queryTimeout
}

struct TColumnDefinition {
Expand Down