From 78475663b2aadd9a28f79db283302c6ba8f1f3f4 Mon Sep 17 00:00:00 2001 From: chenhao16 Date: Mon, 18 Dec 2017 11:15:53 +0800 Subject: [PATCH 1/4] exchange check error when parent child is union --- be/src/exec/exchange_node.cpp | 2 +- be/src/exec/pl_task_root.cpp | 152 --------------------------------- be/src/exec/pl_task_root.h | 53 ------------ be/src/runtime/descriptors.cpp | 43 ++++++++++ be/src/runtime/descriptors.h | 32 +++++++ 5 files changed, 76 insertions(+), 206 deletions(-) delete mode 100644 be/src/exec/pl_task_root.cpp delete mode 100644 be/src/exec/pl_task_root.h diff --git a/be/src/exec/exchange_node.cpp b/be/src/exec/exchange_node.cpp index df2ec79a261c4c..827473addb48f0 100644 --- a/be/src/exec/exchange_node.cpp +++ b/be/src/exec/exchange_node.cpp @@ -201,7 +201,7 @@ Status ExchangeNode::get_next(RuntimeState* state, RowBatch* output_batch, bool* } _next_row_idx = 0; - DCHECK(_input_batch->row_desc().is_prefix_of(output_batch->row_desc())); + DCHECK(_input_batch->row_desc().layout_is_prefix_of(output_batch->row_desc())); } } diff --git a/be/src/exec/pl_task_root.cpp b/be/src/exec/pl_task_root.cpp deleted file mode 100644 index b90ee388c33a0f..00000000000000 --- a/be/src/exec/pl_task_root.cpp +++ /dev/null @@ -1,152 +0,0 @@ -// Modifications copyright (C) 2017, Baidu.com, Inc. -// Copyright 2017 The Apache Software Foundation - -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "exec/pl_task_root.h" - -namespace palo { - -ExchangeNode::ExchangeNode( - ObjectPool* pool, - const TPlanNode& tnode, - const DescriptorTbl& descs) : - ExecNode(pool, tnode, descs), - _num_senders(0), - _stream_recvr(NULL), - _next_row_idx(0) { -} - -ExchangeNode::~ExchangeNode() { -} - -Status ExchangeNode::init(const TPlanNode& tnode) { - return ExecNode::init(tnode); -} - -Status ExchangeNode::prepare(RuntimeState* state) { - RETURN_IF_ERROR(ExecNode::prepare(state)); - - _convert_row_batch_timer = ADD_TIMER(runtime_profile(), "ConvertRowBatchTime"); - - // TODO: figure out appropriate buffer size - DCHECK_GT(_num_senders, 0); - _stream_recvr = state->create_recvr(_row_descriptor, _id, _num_senders, - config::exchg_node_buffer_size_bytes, runtime_profile()); - return Status::OK; -} - -Status ExchangeNode::open(RuntimeState* state) { - SCOPED_TIMER(_runtime_profile->total_time_counter()); - RETURN_IF_ERROR(ExecNode::open(state)); - return Status::OK; -} - -Status ExchangeNode::close(RuntimeState* state) { - if (is_closed()) { - return Status::OK; - } - return ExecNode::close(state); -} - -Status ExchangeNode::get_next(RuntimeState* state, RowBatch* output_batch, bool* eos) { - RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT)); - SCOPED_TIMER(_runtime_profile->total_time_counter()); - - if (reached_limit()) { - *eos = true; - return Status::OK; - } - - ExprContext* const* ctxs = &_conjunct_ctxs[0]; - int num_ctxs = _conjunct_ctxs.size(); - - while (true) { - { - SCOPED_TIMER(_convert_row_batch_timer); - - // copy rows until we hit the limit/capacity or until we exhaust _input_batch - while (!reached_limit() && !output_batch->is_full() - && _input_batch.get() != NULL && _next_row_idx < _input_batch->capacity()) { - TupleRow* src = _input_batch->get_row(_next_row_idx); - - if (ExecNode::eval_conjuncts(ctxs, num_ctxs, src)) { - int j = output_batch->add_row(); - TupleRow* dest = output_batch->get_row(j); - // if the input row is shorter than the output row, make sure not to leave - // uninitialized Tuple* around - output_batch->clear_row(dest); - // this works as expected if rows from input_batch form a prefix of - // rows in output_batch - _input_batch->copy_row(src, dest); - output_batch->commit_last_row(); - ++_num_rows_returned; - } - - ++_next_row_idx; - } - - COUNTER_SET(_rows_returned_counter, _num_rows_returned); - - if (reached_limit()) { - *eos = true; - return Status::OK; - } - - if (output_batch->is_full()) { - *eos = false; - return Status::OK; - } - } - - // we need more rows - if (_input_batch.get() != NULL) { - _input_batch->transfer_resource_ownership(output_batch); - } - - bool is_cancelled = true; - _input_batch.reset(_stream_recvr->get_batch(&is_cancelled)); - VLOG_FILE << "exch: has batch=" << (_input_batch.get() == NULL ? "false" : "true") - << " #rows=" << (_input_batch.get() != NULL ? _input_batch->num_rows() : 0) - << " is_cancelled=" << (is_cancelled ? "true" : "false") - << " instance_id=" << state->fragment_instance_id(); - - if (is_cancelled) { - return Status::CANCELLED; - } - - *eos = (_input_batch.get() == NULL); - - if (*eos) { - return Status::OK; - } - - _next_row_idx = 0; - DCHECK(_input_batch->row_desc().is_prefix_of(output_batch->row_desc())); - } -} - -void ExchangeNode::debug_string(int indentation_level, std::stringstream* out) const { - *out << string(indentation_level * 2, ' '); - *out << "ExchangeNode(#senders=" << _num_senders; - ExecNode::debug_string(indentation_level, out); - *out << ")"; -} - -} - diff --git a/be/src/exec/pl_task_root.h b/be/src/exec/pl_task_root.h deleted file mode 100644 index 2015cc2a20b808..00000000000000 --- a/be/src/exec/pl_task_root.h +++ /dev/null @@ -1,53 +0,0 @@ -// Modifications copyright (C) 2017, Baidu.com, Inc. -// Copyright 2017 The Apache Software Foundation. - -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include "exec/exec_node.h" - -namespace palo { - -// Pull load task root -class PlTaskRoot : public ExecNode { -public: - PlTaskRoot(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); - virtual ~PlTaskRoot(); - - virtual Status init(const TPlanNode& tnode); - virtual Status prepare(RuntimeState* state); - virtual Status open(RuntimeState* state); - virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos); - virtual Status close(RuntimeState* state); - - // the number of senders needs to be set after the c'tor, because it's not - // recorded in TPlanNode, and before calling prepare() - void set_num_senders(int num_senders) { - _num_senders = num_senders; - } - -protected: - virtual void debug_string(int indentation_level, std::stringstream* out) const; - -private: - int _num_senders; // needed for _stream_recvr construction -}; - -} - diff --git a/be/src/runtime/descriptors.cpp b/be/src/runtime/descriptors.cpp index 7a803112eac801..84dad3e381d119 100644 --- a/be/src/runtime/descriptors.cpp +++ b/be/src/runtime/descriptors.cpp @@ -58,6 +58,7 @@ SlotDescriptor::SlotDescriptor(const TSlotDescriptor& tdesc) _null_indicator_offset(tdesc.nullIndicatorByte, tdesc.nullIndicatorBit), _col_name(tdesc.colName), _slot_idx(tdesc.slotIdx), + _slot_size(_type.get_slot_size()), _field_idx(-1), _is_materialized(tdesc.isMaterialized), _is_null_fn(NULL), @@ -191,6 +192,26 @@ void TupleDescriptor::add_slot(SlotDescriptor* slot) { } } +std::vector TupleDescriptor::slots_ordered_by_idx() const { + std::vector sorted_slots(slots().size()); + for (SlotDescriptor* slot: slots()) { + sorted_slots[slot->_slot_idx] = slot; + } + return sorted_slots; +} + +bool TupleDescriptor::layout_equals(const TupleDescriptor& other_desc) const { + if (byte_size() != other_desc.byte_size()) return false; + if (slots().size() != other_desc.slots().size()) return false; + + std::vector slots = slots_ordered_by_idx(); + std::vector other_slots = other_desc.slots_ordered_by_idx(); + for (int i = 0; i < slots.size(); ++i) { + if (!slots[i]->layout_equals(*other_slots[i])) return false; + } + return true; +} + std::string TupleDescriptor::debug_string() const { std::stringstream out; out << "Tuple(id=" << _id << " size=" << _byte_size; @@ -330,6 +351,19 @@ bool RowDescriptor::equals(const RowDescriptor& other_desc) const { return true; } +bool RowDescriptor::layout_is_prefix_of(const RowDescriptor& other_desc) const { + if (_tuple_desc_map.size() > other_desc._tuple_desc_map.size()) return false; + for (int i = 0; i < _tuple_desc_map.size(); ++i) { + if (!_tuple_desc_map[i]->layout_equals(*other_desc._tuple_desc_map[i])) return false; + } + return true; +} + +bool RowDescriptor::layout_equals(const RowDescriptor& other_desc) const { + if (_tuple_desc_map.size() != other_desc._tuple_desc_map.size()) return false; + return layout_is_prefix_of(other_desc); +} + std::string RowDescriptor::debug_string() const { std::stringstream ss; @@ -468,6 +502,15 @@ void DescriptorTbl::get_tuple_descs(std::vector* descs) const } } +bool SlotDescriptor::layout_equals(const SlotDescriptor& other_desc) const { + if (type() != other_desc.type()) return false; + if (is_nullable() != other_desc.is_nullable()) return false; + if (slot_size() != other_desc.slot_size()) return false; + if (tuple_offset() != other_desc.tuple_offset()) return false; + if (!null_indicator_offset().equals(other_desc.null_indicator_offset())) return false; + return true; +} + // Generate function to check if a slot is null. The resulting IR looks like: // (in this case the tuple contains only a nullable double) // define i1 @IsNull({ i8, double }* %tuple) { diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h index fd814efa3f3888..262e960175f28a 100644 --- a/be/src/runtime/descriptors.h +++ b/be/src/runtime/descriptors.h @@ -71,6 +71,10 @@ struct NullIndicatorOffset { : byte_offset(byte_offset), bit_mask(bit_offset == -1 ? 0 : 1 << (7 - bit_offset)) { } + + bool equals(const NullIndicatorOffset& o) const { + return this->byte_offset == o.byte_offset && this->bit_mask == o.bit_mask; + } std::string debug_string() const; }; @@ -110,10 +114,19 @@ class SlotDescriptor { bool is_nullable() const { return _null_indicator_offset.bit_mask != 0; } + + int slot_size() const { + return _slot_size; + } + std::string col_name() const { return _col_name; } + /// Return true if the physical layout of this descriptor matches the physical layout + /// of other_desc, but not necessarily ids. + bool layout_equals(const SlotDescriptor& other_desc) const; + std::string debug_string() const; // Codegen for: bool IsNull(Tuple* tuple) @@ -140,6 +153,9 @@ class SlotDescriptor { // the idx of the slot in the tuple descriptor (0-based). // this is provided by the FE const int _slot_idx; + + // the byte size of this slot. + const int _slot_size; // the idx of the slot in the llvm codegen'd tuple struct // this is set by TupleDescriptor during codegen and takes into account @@ -305,6 +321,11 @@ class TupleDescriptor { TupleId id() const { return _id; } + + /// Return true if the physical layout of this descriptor matches that of other_desc, + /// but not necessarily the id. + bool layout_equals(const TupleDescriptor& other_desc) const; + std::string debug_string() const; // Creates a typed struct description for llvm. The layout of the struct is computed @@ -343,6 +364,9 @@ class TupleDescriptor { TupleDescriptor(const TTupleDescriptor& tdesc); void add_slot(SlotDescriptor* slot); + + /// Returns slots in their physical order. + std::vector slots_ordered_by_idx() const; }; class DescriptorTbl { @@ -447,6 +471,14 @@ class RowDescriptor { // Return true if the tuple ids of this descriptor match tuple ids of other desc. bool equals(const RowDescriptor& other_desc) const; + /// Return true if the physical layout of this descriptor matches the physical layout + /// of other_desc, but not necessarily the ids. + bool layout_equals(const RowDescriptor& other_desc) const; + + /// Return true if the tuples of this descriptor are a prefix of the tuples of + /// other_desc. Tuples are compared by their physical layout and not by ids. + bool layout_is_prefix_of(const RowDescriptor& other_desc) const; + std::string debug_string() const; private: From 433f3b86bb758242c02e17d0dda7b8149c3137d8 Mon Sep 17 00:00:00 2001 From: chenhao16 Date: Mon, 18 Dec 2017 11:22:11 +0800 Subject: [PATCH 2/4] can't materialize join's EqJoinPredicates when refereces Subquery's column --- fe/src/com/baidu/palo/analysis/SelectStmt.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/src/com/baidu/palo/analysis/SelectStmt.java b/fe/src/com/baidu/palo/analysis/SelectStmt.java index 0e5600cc42e31c..364dc3e4963377 100644 --- a/fe/src/com/baidu/palo/analysis/SelectStmt.java +++ b/fe/src/com/baidu/palo/analysis/SelectStmt.java @@ -432,7 +432,7 @@ public void materializeRequiredSlots(Analyzer analyzer) throws AnalysisException // can also be safely evaluated below the join (picked up by getBoundPredicates()). // Such predicates will be marked twice and that is ok. List unassigned = - analyzer.getUnassignedConjuncts(getTableRefIds()); + analyzer.getUnassignedConjuncts(getTableRefIds(), true); List unassignedJoinConjuncts = Lists.newArrayList(); for (Expr e: unassigned) { if (analyzer.evalByJoin(e)) { From 782f76875b91b04d0484d83d4b1d759eaab06758 Mon Sep 17 00:00:00 2001 From: chenhao7253886 <510341142@qq.com> Date: Mon, 18 Dec 2017 11:29:08 +0800 Subject: [PATCH 3/4] can't materialize join's EqJoinPredicates when refereces Subquery's column --- fe/src/com/baidu/palo/analysis/SelectStmt.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/src/com/baidu/palo/analysis/SelectStmt.java b/fe/src/com/baidu/palo/analysis/SelectStmt.java index 0e5600cc42e31c..364dc3e4963377 100644 --- a/fe/src/com/baidu/palo/analysis/SelectStmt.java +++ b/fe/src/com/baidu/palo/analysis/SelectStmt.java @@ -432,7 +432,7 @@ public void materializeRequiredSlots(Analyzer analyzer) throws AnalysisException // can also be safely evaluated below the join (picked up by getBoundPredicates()). // Such predicates will be marked twice and that is ok. List unassigned = - analyzer.getUnassignedConjuncts(getTableRefIds()); + analyzer.getUnassignedConjuncts(getTableRefIds(), true); List unassignedJoinConjuncts = Lists.newArrayList(); for (Expr e: unassigned) { if (analyzer.evalByJoin(e)) { From 3e33eba33f2a29dbdd4af0a656d2c8ec5bd84a5b Mon Sep 17 00:00:00 2001 From: chenhao16 Date: Mon, 12 Mar 2018 16:00:22 +0800 Subject: [PATCH 4/4] distinguish limit 0 in analyze --- fe/src/com/baidu/palo/analysis/LimitElement.java | 6 ++++++ fe/src/com/baidu/palo/analysis/QueryStmt.java | 11 ++++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/fe/src/com/baidu/palo/analysis/LimitElement.java b/fe/src/com/baidu/palo/analysis/LimitElement.java index 10c6056dde47d6..6443d8f582836c 100644 --- a/fe/src/com/baidu/palo/analysis/LimitElement.java +++ b/fe/src/com/baidu/palo/analysis/LimitElement.java @@ -91,7 +91,13 @@ public String toSql() { sb.append("" + limit); return sb.toString(); } + + public void analyze(Analyzer analyzer) { + if (limit == 0) analyzer.setHasEmptyResultSet(); + } public void reset() { + limit = -1; + offset = 0; } } diff --git a/fe/src/com/baidu/palo/analysis/QueryStmt.java b/fe/src/com/baidu/palo/analysis/QueryStmt.java index fd11ec36cfceb3..84361697263269 100644 --- a/fe/src/com/baidu/palo/analysis/QueryStmt.java +++ b/fe/src/com/baidu/palo/analysis/QueryStmt.java @@ -110,10 +110,19 @@ public abstract class QueryStmt extends StatementBase { public void analyze(Analyzer analyzer) throws AnalysisException, InternalException { if (isAnalyzed()) return; super.analyze(analyzer); - // analyzeLimit(analyzer); + analyzeLimit(analyzer); if (hasWithClause()) withClause_.analyze(analyzer); } + private void analyzeLimit(Analyzer analyzer) throws AnalysisException { + // TODO chenhao + if (limitElement.getOffset() > 0 && !hasOrderByClause()) { + throw new AnalysisException("OFFSET requires an ORDER BY clause: " + + limitElement.toSql().trim()); + } + limitElement.analyze(analyzer); + } + /** * Returns a list containing all the materialized tuple ids that this stmt is * correlated with (i.e., those tuple ids from outer query blocks that TableRefs