From 1a46fa36fff1a5d379daa399b1345bc7ffe96bae Mon Sep 17 00:00:00 2001 From: chenmingyu Date: Fri, 6 Apr 2018 10:13:55 +0800 Subject: [PATCH] merge to 2d4cc9e1358c980b4f726e17d036639bc31127aa contains: first_value with PRECEDING LEFT and NON-PRECEDING RIGHT rewrite error and count* materialize SlotDescriptor error when referring to the slot of the current query and subquery Simultaneously. fix join and count(*) materialize SlotDescriptor error. fix materialize scannode's conjuncts bug. remove no used materialization work. it have to evaluate orderby in subquery because we limit the number of rows returned by subquery. the method of judging limit is wrong. user info is missing when retrying to call load check. It's wrong to pass aggregate function when it's param is not materialized. InsertStmt does not pass the session param to observer. --- be/src/http/action/mini_load.cpp | 2 + .../baidu/palo/analysis/AggregateInfo.java | 8 +-- .../com/baidu/palo/analysis/AnalyticExpr.java | 2 +- .../com/baidu/palo/analysis/LimitElement.java | 4 ++ fe/src/com/baidu/palo/analysis/QueryStmt.java | 68 ++++++++++--------- .../com/baidu/palo/analysis/SelectStmt.java | 2 +- .../baidu/palo/planner/AggregationNode.java | 5 +- fe/src/com/baidu/palo/planner/Planner.java | 3 +- .../baidu/palo/planner/SingleNodePlanner.java | 3 + .../com/baidu/palo/qe/ConnectProcessor.java | 7 ++ .../com/baidu/palo/qe/MasterOpExecutor.java | 2 + gensrc/thrift/FrontendService.thrift | 2 + 12 files changed, 65 insertions(+), 43 deletions(-) diff --git a/be/src/http/action/mini_load.cpp b/be/src/http/action/mini_load.cpp index 4777c4c0e8e5e8..b867812411e987 100644 --- a/be/src/http/action/mini_load.cpp +++ b/be/src/http/action/mini_load.cpp @@ -442,6 +442,8 @@ Status MiniLoadAction::check_auth(HttpRequest* http_req) { return status; } client->loadCheck(res, req); + _user.assign(req.user); + _cluster.assign(cluster); } } catch (apache::thrift::TException& e) { // failed when retry. diff --git a/fe/src/com/baidu/palo/analysis/AggregateInfo.java b/fe/src/com/baidu/palo/analysis/AggregateInfo.java index 659e0c26215ff9..8db80caec571de 100644 --- a/fe/src/com/baidu/palo/analysis/AggregateInfo.java +++ b/fe/src/com/baidu/palo/analysis/AggregateInfo.java @@ -297,8 +297,8 @@ private void createDistinctAggInfo( public ArrayList getMaterializedAggregateExprs() { ArrayList result = Lists.newArrayList(); - for (Integer i : materializedAggregateSlots_) { - result.add(aggregateExprs_.get(i)); + for (Integer i: materializedSlots_) { + result.add(aggregateExprs_.get(i)); } return result; } @@ -725,8 +725,8 @@ public void materializeRequiredSlots(Analyzer analyzer, ExprSubstitutionMap smap // over query statements, if aggregate functions contain count(*), now // materialize all slots this SelectStmt maps. // chenhao added. - if (hasCountStar) { - resolvedExprs = smap.getRhs(); + if (hasCountStar && smap != null && smap.size() > 0) { + resolvedExprs.addAll(smap.getRhs()); } analyzer.materializeSlots(resolvedExprs); diff --git a/fe/src/com/baidu/palo/analysis/AnalyticExpr.java b/fe/src/com/baidu/palo/analysis/AnalyticExpr.java index e20593cd62c63f..dabc3efc848322 100644 --- a/fe/src/com/baidu/palo/analysis/AnalyticExpr.java +++ b/fe/src/com/baidu/palo/analysis/AnalyticExpr.java @@ -660,7 +660,7 @@ private void standardize(Analyzer analyzer) throws AnalysisException { // -1 indicates that no NULL values are inserted even though we set the end // bound to the start bound (which is PRECEDING) below; this is different from // the default behavior of windows with an end bound PRECEDING. - paramExprs.add(new DecimalLiteral(BigDecimal.valueOf(-1))); + paramExprs.add(new IntLiteral(-1, Type.BIGINT)); } window = new AnalyticWindow(window.getType(), diff --git a/fe/src/com/baidu/palo/analysis/LimitElement.java b/fe/src/com/baidu/palo/analysis/LimitElement.java index 79661315372d4a..105a9b9ab40692 100644 --- a/fe/src/com/baidu/palo/analysis/LimitElement.java +++ b/fe/src/com/baidu/palo/analysis/LimitElement.java @@ -80,6 +80,10 @@ public long getOffset() { return offset; } + public boolean hasOffset() { + return offset != 0; + } + public String toSql() { if (limit == -1) { return ""; diff --git a/fe/src/com/baidu/palo/analysis/QueryStmt.java b/fe/src/com/baidu/palo/analysis/QueryStmt.java index 5c46778f939aef..77e29707018f5b 100644 --- a/fe/src/com/baidu/palo/analysis/QueryStmt.java +++ b/fe/src/com/baidu/palo/analysis/QueryStmt.java @@ -156,20 +156,20 @@ public List getCorrelatedTupleIds(Analyzer analyzer) collectTableRefs(tblRefs); for (TableRef tblRef: tblRefs) { if (absoluteRef == null && !tblRef.isRelative()) absoluteRef = tblRef; - if (tblRef.isCorrelated()) { - /* - // Check if the correlated table ref is rooted at a tuple descriptor from within - // this query stmt. If so, the correlation is contained within this stmt - // and the table ref does not conflict with absolute refs. - CollectionTableRef t = (CollectionTableRef) tblRef; - Preconditions.checkState(t.getResolvedPath().isRootedAtTuple()); - // This check relies on tblRefs being in depth-first order. - if (!tblRefIds.contains(t.getResolvedPath().getRootDesc().getId())) { - if (correlatedRef == null) correlatedRef = tblRef; - correlatedTupleIds.add(t.getResolvedPath().getRootDesc().getId()); - } - */ - } + /*if (tblRef.isCorrelated()) { + * + * // Check if the correlated table ref is rooted at a tuple descriptor from within + * // this query stmt. If so, the correlation is contained within this stmt + * // and the table ref does not conflict with absolute refs. + * CollectionTableRef t = (CollectionTableRef) tblRef; + * Preconditions.checkState(t.getResolvedPath().isRootedAtTuple()); + * // This check relies on tblRefs being in depth-first order. + * if (!tblRefIds.contains(t.getResolvedPath().getRootDesc().getId())) { + * if (correlatedRef == null) correlatedRef = tblRef; + * correlatedTupleIds.add(t.getResolvedPath().getRootDesc().getId()); + * } + * + }*/ if (correlatedRef != null && absoluteRef != null) { throw new AnalysisException(String.format( "Nested query is illegal because it contains a table reference '%s' " + @@ -234,23 +234,25 @@ protected void createSortInfo(Analyzer analyzer) throws AnalysisException { sortInfo = new SortInfo(orderingExprs, isAscOrder, nullsFirstParams); // order by w/o limit and offset in inline views, union operands and insert statements // are ignored. - if (!hasLimit() && !hasOffset() && !analyzer.isRootAnalyzer()) { - evaluateOrderBy = false; - // Return a warning that the order by was ignored. - StringBuilder strBuilder = new StringBuilder(); - strBuilder.append("Ignoring ORDER BY clause without LIMIT or OFFSET: "); - strBuilder.append("ORDER BY "); - strBuilder.append(orderByElements.get(0).toSql()); - for (int i = 1; i < orderByElements.size(); ++i) { - strBuilder.append(", ").append(orderByElements.get(i).toSql()); - } - strBuilder.append(".\nAn ORDER BY appearing in a view, subquery, union operand, "); - strBuilder.append("or an insert/ctas statement has no effect on the query result "); - strBuilder.append("unless a LIMIT and/or OFFSET is used in conjunction "); - strBuilder.append("with the ORDER BY."); - } else { - evaluateOrderBy = true; - } + // TODO chenhao, open this when we don't limit rows subquery returns by SortNode. + /*if (!hasLimit() && !hasOffset() && !analyzer.isRootAnalyzer()) { + * evaluateOrderBy = false; + * // Return a warning that the order by was ignored. + * StringBuilder strBuilder = new StringBuilder(); + * strBuilder.append("Ignoring ORDER BY clause without LIMIT or OFFSET: "); + * strBuilder.append("ORDER BY "); + * strBuilder.append(orderByElements.get(0).toSql()); + * for (int i = 1; i < orderByElements.size(); ++i) { + * strBuilder.append(", ").append(orderByElements.get(i).toSql()); + * } + * strBuilder.append(".\nAn ORDER BY appearing in a view, subquery, union operand, "); + * strBuilder.append("or an insert/ctas statement has no effect on the query result "); + * strBuilder.append("unless a LIMIT and/or OFFSET is used in conjunction "); + * strBuilder.append("with the ORDER BY."); + * } else { + */ + evaluateOrderBy = true; + //} } /** @@ -400,8 +402,8 @@ public void removeOrderByElements() { public boolean hasOrderByClause() { return orderByElements != null; } - public boolean hasLimit() { return limitElement != null; } - public boolean hasOffset() { return limitElement != null && limitElement.getOffset() != 0; } + public boolean hasLimit() { return limitElement != null && limitElement.hasLimit(); } + public boolean hasOffset() { return limitElement != null && limitElement.hasOffset(); } public long getLimit() { return limitElement.getLimit(); diff --git a/fe/src/com/baidu/palo/analysis/SelectStmt.java b/fe/src/com/baidu/palo/analysis/SelectStmt.java index 224dfde3c2ffd5..22a07063b79859 100644 --- a/fe/src/com/baidu/palo/analysis/SelectStmt.java +++ b/fe/src/com/baidu/palo/analysis/SelectStmt.java @@ -435,7 +435,7 @@ public void materializeRequiredSlots(Analyzer analyzer) throws AnalysisException analyzer.getUnassignedConjuncts(getTableRefIds(), true); List unassignedJoinConjuncts = Lists.newArrayList(); for (Expr e: unassigned) { - if (analyzer.evalByJoin(e)) { + if (analyzer.evalAfterJoin(e)) { unassignedJoinConjuncts.add(e); } } diff --git a/fe/src/com/baidu/palo/planner/AggregationNode.java b/fe/src/com/baidu/palo/planner/AggregationNode.java index d5d1a5233ca508..841f2e7d6e97e7 100644 --- a/fe/src/com/baidu/palo/planner/AggregationNode.java +++ b/fe/src/com/baidu/palo/planner/AggregationNode.java @@ -236,8 +236,7 @@ protected void toThrift(TPlanNode msg) { List aggregateFunctions = Lists.newArrayList(); // only serialize agg exprs that are being materialized - //for (FunctionCallExpr e: aggInfo.getMaterializedAggregateExprs()) { - for (FunctionCallExpr e : aggInfo.getAggregateExprs()) { + for (FunctionCallExpr e: aggInfo.getMaterializedAggregateExprs()) { aggregateFunctions.add(e.treeToThrift()); } msg.agg_node = @@ -254,7 +253,7 @@ protected void toThrift(TPlanNode msg) { @Override protected String getNodeExplainString(String detailPrefix, TExplainLevel detailLevel) { StringBuilder output = new StringBuilder(); - if (aggInfo.getAggregateExprs() != null && aggInfo.getAggregateExprs().size() > 0) { + if (aggInfo.getAggregateExprs() != null && aggInfo.getMaterializedAggregateExprs().size() > 0) { output.append(detailPrefix + "output: ").append( getExplainString(aggInfo.getAggregateExprs()) + "\n"); } diff --git a/fe/src/com/baidu/palo/planner/Planner.java b/fe/src/com/baidu/palo/planner/Planner.java index 85cc5c529ad2fa..c2119cb9a387c2 100644 --- a/fe/src/com/baidu/palo/planner/Planner.java +++ b/fe/src/com/baidu/palo/planner/Planner.java @@ -151,8 +151,9 @@ public void createPlanFragments(StatementBase statment, Analyzer analyzer, TQuer } } + // TODO chenhao16 , no used materialization work // compute referenced slots before calling computeMemLayout() - analyzer.markRefdSlots(analyzer, singleNodePlan, resultExprs, null); + //analyzer.markRefdSlots(analyzer, singleNodePlan, resultExprs, null); setResultExprScale(analyzer, queryStmt.getResultExprs()); diff --git a/fe/src/com/baidu/palo/planner/SingleNodePlanner.java b/fe/src/com/baidu/palo/planner/SingleNodePlanner.java index 53ecca03b57e29..29c8c762740024 100644 --- a/fe/src/com/baidu/palo/planner/SingleNodePlanner.java +++ b/fe/src/com/baidu/palo/planner/SingleNodePlanner.java @@ -1140,6 +1140,9 @@ private PlanNode createScanNode(Analyzer analyzer, TableRef tblRef) } // assignConjuncts(scanNode, analyzer); scanNode.init(analyzer); + // TODO chenhao16 add + // materialize conjuncts in where + analyzer.materializeSlots(scanNode.getConjuncts()); scanNodes.add(scanNode); return scanNode; } diff --git a/fe/src/com/baidu/palo/qe/ConnectProcessor.java b/fe/src/com/baidu/palo/qe/ConnectProcessor.java index 1fa0c1fb4fef96..cc93108fe3404c 100644 --- a/fe/src/com/baidu/palo/qe/ConnectProcessor.java +++ b/fe/src/com/baidu/palo/qe/ConnectProcessor.java @@ -311,6 +311,13 @@ public TMasterOpResult proxyExecute(TMasterOpRequest request) { if (request.isSetResourceInfo()) { ctx.getSessionVariable().setResourceGroup(request.getResourceInfo().getGroup()); } + if (request.isSetExecMemLimit()) { + ctx.getSessionVariable().setMaxExecMemByte(request.getExecMemLimit()); + } + if (request.isSetQueryTimeout()) { + ctx.getSessionVariable().setQueryTimeoutS(request.getQueryTimeout()); + } + ctx.setThreadLocalInfo(); StmtExecutor executor = null; diff --git a/fe/src/com/baidu/palo/qe/MasterOpExecutor.java b/fe/src/com/baidu/palo/qe/MasterOpExecutor.java index 2f44c2fbf5f2f1..b28ec14dbf69a4 100644 --- a/fe/src/com/baidu/palo/qe/MasterOpExecutor.java +++ b/fe/src/com/baidu/palo/qe/MasterOpExecutor.java @@ -75,6 +75,8 @@ private void forward() throws Exception { params.setUser(ctx.getUser()); params.setDb(ctx.getDatabase()); params.setResourceInfo(ctx.toResourceCtx()); + params.setExecMemLimit(ctx.getSessionVariable().getMaxExecMemByte()); + params.setQueryTimeout(ctx.getSessionVariable().getQueryTimeoutS()); LOG.info("Forward statement {} to Master {}", originStmt, thriftAddress); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 2af174be4f524f..0e37784b66d2ef 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -400,6 +400,8 @@ struct TMasterOpRequest { 3: required string sql 4: optional Types.TResourceInfo resourceInfo 5: optional string cluster + 6: optional i64 execMemLimit + 7: optional i32 queryTimeout } struct TColumnDefinition {