diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 368e94562a264f..ed032d0976700e 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -514,6 +514,24 @@ std::string ExecNode::get_name() { Status ExecNode::do_projections(vectorized::Block* origin_block, vectorized::Block* output_block) { SCOPED_TIMER(_exec_timer); SCOPED_TIMER(_projection_timer); + auto insert_column_datas = [&](auto& to, vectorized::ColumnPtr& from, size_t rows) { + if (to->is_nullable() && !from->is_nullable()) { + if (_keep_origin || !from->is_exclusive()) { + auto& null_column = reinterpret_cast(*to); + null_column.get_nested_column().insert_range_from(*from, 0, rows); + null_column.get_null_map_column().get_data().resize_fill(rows, 0); + } else { + to = make_nullable(from, false)->assume_mutable(); + } + } else { + if (_keep_origin || !from->is_exclusive()) { + to->insert_range_from(*from, 0, rows); + } else { + to = from->assume_mutable(); + } + } + }; + using namespace vectorized; MutableBlock mutable_block = VectorizedUtils::build_mutable_mem_reuse_block(output_block, *_output_row_descriptor); @@ -535,13 +553,7 @@ Status ExecNode::do_projections(vectorized::Block* origin_block, vectorized::Blo auto column_ptr = origin_block->get_by_position(result_column_id) .column->convert_to_full_column_if_const(); //TODO: this is a quick fix, we need a new function like "change_to_nullable" to do it - if (mutable_columns[i]->is_nullable() xor column_ptr->is_nullable()) { - DCHECK(mutable_columns[i]->is_nullable() && !column_ptr->is_nullable()); - reinterpret_cast(mutable_columns[i].get()) - ->insert_range_from_not_nullable(*column_ptr, 0, rows); - } else { - mutable_columns[i]->insert_range_from(*column_ptr, 0, rows); - } + insert_column_datas(mutable_columns[i], column_ptr, rows); } DCHECK(mutable_block.rows() == rows); output_block->set_columns(std::move(mutable_columns)); diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h index 1dd8979f5b36af..05e32528e9e58d 100644 --- a/be/src/exec/exec_node.h +++ b/be/src/exec/exec_node.h @@ -325,6 +325,8 @@ class ExecNode { std::shared_ptr _query_statistics = nullptr; + bool _keep_origin = false; + private: static Status create_tree_helper(RuntimeState* state, ObjectPool* pool, const std::vector& tnodes, diff --git a/be/src/pipeline/exec/join_probe_operator.cpp b/be/src/pipeline/exec/join_probe_operator.cpp index 03b20fdb4d4d35..e0a325e5557991 100644 --- a/be/src/pipeline/exec/join_probe_operator.cpp +++ b/be/src/pipeline/exec/join_probe_operator.cpp @@ -17,6 +17,11 @@ #include "join_probe_operator.h" +#include + +#include "common/exception.h" +#include "common/logging.h" +#include "common/status.h" #include "pipeline/exec/hashjoin_probe_operator.h" #include "pipeline/exec/nested_loop_join_probe_operator.h" #include "pipeline/exec/operator.h" @@ -82,6 +87,14 @@ template Status JoinProbeLocalState::_build_output_block( vectorized::Block* origin_block, vectorized::Block* output_block, bool keep_origin) { auto& p = Base::_parent->template cast(); + if (!Base::_projections.empty()) { + *output_block = *origin_block; + if (p._is_outer_join) { + DCHECK(output_block->columns() >= 2); + output_block->erase_tail(output_block->columns() - 2); + } + return Status::OK(); + } SCOPED_TIMER(_build_output_block_timer); auto is_mem_reuse = output_block->mem_reuse(); vectorized::MutableBlock mutable_block = @@ -93,10 +106,10 @@ Status JoinProbeLocalState::_build_output_block( // TODO: After FE plan support same nullable of output expr and origin block and mutable column // we should replace `insert_column_datas` by `insert_range_from` - auto insert_column_datas = [keep_origin](auto& to, vectorized::ColumnPtr& from, size_t rows) { + auto insert_column_datas = [&](auto& to, vectorized::ColumnPtr& from, size_t rows) { if (to->is_nullable() && !from->is_nullable()) { if (keep_origin || !from->is_exclusive()) { - auto& null_column = reinterpret_cast(*to); + auto& null_column = assert_cast(*to); null_column.get_nested_column().insert_range_from(*from, 0, rows); null_column.get_null_map_column().get_data().resize_fill(rows, 0); } else { @@ -109,7 +122,48 @@ Status JoinProbeLocalState::_build_output_block( to = from->assume_mutable(); } } + + if (to->is_nullable()) { + auto& null_column = assert_cast(*to); + bool all_null = true; + for (int i = 0; i < null_column.size(); i++) { + if (!null_column.is_null_at(i)) { + all_null = false; + } + } + LOG_WARNING("yxc test null").tag("null node id", p._node_id).tag("all null", all_null); + } + }; + + LOG_WARNING("yxc test empty copy"); + + auto print = [](vectorized::VExprContextSPtrs& ctxs) { + std::string s = "[ "; + for (auto& ctx : ctxs) { + s += ctx->root()->debug_string() + "\t\t\t"; + } + return s + " ]"; }; + + auto print_desc = [](std::unique_ptr& desc) { + std::string ret; + if (!desc) { + ret = "is null"; + } else { + ret = desc->debug_string(); + } + return ret; + }; + + LOG_WARNING("yxc test ") + .tag("node id", p._node_id) + .tag("Base::_projections", print(Base::_projections)) + .tag("_output_expr_ctxs", print(_output_expr_ctxs)) + .tag("output_block", mutable_block.dump_names() + " " + mutable_block.dump_types()) + .tag("orgin block ", origin_block->dump_structure()) + .tag("_intermediate_row_desc", print_desc(p._intermediate_row_desc)) + .tag("plan node", print_desc(p._output_row_descriptor)) + .tag("join node", print_desc(p._output_row_desc)); if (rows != 0) { auto& mutable_columns = mutable_block.mutable_columns(); if (_output_expr_ctxs.empty()) { @@ -197,18 +251,32 @@ JoinProbeOperatorX::JoinProbeOperatorX(ObjectPool* pool, const T _intermediate_row_desc.reset(new RowDescriptor( descs, tnode.hash_join_node.vintermediate_tuple_id_list, std::vector(tnode.hash_join_node.vintermediate_tuple_id_list.size()))); - _output_row_desc.reset( - new RowDescriptor(descs, {tnode.hash_join_node.voutput_tuple_id}, {false})); + if (!Base::_output_row_descriptor) { + _output_row_desc.reset( + new RowDescriptor(descs, {tnode.hash_join_node.voutput_tuple_id}, {false})); + } else { + if (tnode.hash_join_node.__isset.voutput_tuple_id) { + throw Exception {ErrorCode::INTERNAL_ERROR, + "yxc test error .__isset.voutput_tuple_id"}; + } + } + } else if (tnode.__isset.nested_loop_join_node) { _intermediate_row_desc.reset(new RowDescriptor( descs, tnode.nested_loop_join_node.vintermediate_tuple_id_list, std::vector(tnode.nested_loop_join_node.vintermediate_tuple_id_list.size()))); - _output_row_desc.reset( - new RowDescriptor(descs, {tnode.nested_loop_join_node.voutput_tuple_id}, {false})); + if (!Base::_output_row_descriptor) { + _output_row_desc.reset(new RowDescriptor( + descs, {tnode.nested_loop_join_node.voutput_tuple_id}, {false})); + } } else { // Iff BE has been upgraded and FE has not yet, we should keep origin logics for CROSS JOIN. DCHECK_EQ(_join_op, TJoinOp::CROSS_JOIN); } + + if (Base::_node_id == 13) { + LOG_WARNING("yxc test"); + } } template diff --git a/be/src/pipeline/exec/join_probe_operator.h b/be/src/pipeline/exec/join_probe_operator.h index 9bb716ff36dfe3..be68e547de0f68 100644 --- a/be/src/pipeline/exec/join_probe_operator.h +++ b/be/src/pipeline/exec/join_probe_operator.h @@ -70,7 +70,12 @@ class JoinProbeOperatorX : public StatefulOperatorX { Status init(const TPlanNode& tnode, RuntimeState* state) override; Status open(doris::RuntimeState* state) override; - [[nodiscard]] const RowDescriptor& row_desc() const override { return *_output_row_desc; } + [[nodiscard]] const RowDescriptor& row_desc() const override { + if (Base::_output_row_descriptor) { + return *Base::_output_row_descriptor; + } + return *_output_row_desc; + } [[nodiscard]] const RowDescriptor& intermediate_row_desc() const override { return *_intermediate_row_desc; diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp index 9272418ca0ad60..35d5bc6085ee41 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp @@ -436,7 +436,9 @@ NestedLoopJoinProbeOperatorX::NestedLoopJoinProbeOperatorX(ObjectPool* pool, con : JoinProbeOperatorX(pool, tnode, operator_id, descs), _is_output_left_side_only(tnode.nested_loop_join_node.__isset.is_output_left_side_only && tnode.nested_loop_join_node.is_output_left_side_only), - _old_version_flag(!tnode.__isset.nested_loop_join_node) {} + _old_version_flag(!tnode.__isset.nested_loop_join_node) { + _keep_origin = _is_output_left_side_only; +} Status NestedLoopJoinProbeOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(JoinProbeOperatorX::init(tnode, state)); diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.h b/be/src/pipeline/exec/nested_loop_join_probe_operator.h index 770289f397f2ba..7a8be87d922b90 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.h +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.h @@ -228,7 +228,7 @@ class NestedLoopJoinProbeOperatorX final const RowDescriptor& row_desc() const override { return _old_version_flag ? (_output_row_descriptor ? *_output_row_descriptor : _row_descriptor) - : *_output_row_desc; + : (_output_row_descriptor ? *_output_row_descriptor : *_output_row_desc); } bool need_more_input_data(RuntimeState* state) const override; diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index f9a30c7e3ac65e..e14b6d94aa8a27 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -172,9 +172,28 @@ void PipelineXLocalStateBase::clear_origin_block() { Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* origin_block, vectorized::Block* output_block) const { - auto local_state = state->get_local_state(operator_id()); + auto* local_state = state->get_local_state(operator_id()); SCOPED_TIMER(local_state->exec_time_counter()); SCOPED_TIMER(local_state->_projection_timer); + + auto insert_column_datas = [&](auto& to, vectorized::ColumnPtr& from, size_t rows) { + if (to->is_nullable() && !from->is_nullable()) { + if (_keep_origin || !from->is_exclusive()) { + auto& null_column = reinterpret_cast(*to); + null_column.get_nested_column().insert_range_from(*from, 0, rows); + null_column.get_null_map_column().get_data().resize_fill(rows, 0); + } else { + to = make_nullable(from, false)->assume_mutable(); + } + } else { + if (_keep_origin || !from->is_exclusive()) { + to->insert_range_from(*from, 0, rows); + } else { + to = from->assume_mutable(); + } + } + }; + using namespace vectorized; vectorized::MutableBlock mutable_block = vectorized::VectorizedUtils::build_mutable_mem_reuse_block(output_block, @@ -189,14 +208,7 @@ Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* ori RETURN_IF_ERROR(local_state->_projections[i]->execute(origin_block, &result_column_id)); auto column_ptr = origin_block->get_by_position(result_column_id) .column->convert_to_full_column_if_const(); - //TODO: this is a quick fix, we need a new function like "change_to_nullable" to do it - if (mutable_columns[i]->is_nullable() xor column_ptr->is_nullable()) { - DCHECK(mutable_columns[i]->is_nullable() && !column_ptr->is_nullable()); - reinterpret_cast(mutable_columns[i].get()) - ->insert_range_from_not_nullable(*column_ptr, 0, rows); - } else { - mutable_columns[i]->insert_range_from(*column_ptr, 0, rows); - } + insert_column_datas(mutable_columns[i], column_ptr, rows); } DCHECK(mutable_block.rows() == rows); output_block->set_columns(std::move(mutable_columns)); diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index 06ba93a36f3832..18395229f9117b 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -326,6 +326,8 @@ class OperatorXBase : public OperatorBase { std::string _op_name; bool _ignore_data_distribution = false; int _parallel_tasks = 0; + + bool _keep_origin = false; }; template diff --git a/be/src/vec/exec/join/vjoin_node_base.cpp b/be/src/vec/exec/join/vjoin_node_base.cpp index 6fab6b8b91f759..79ca34e8326d9b 100644 --- a/be/src/vec/exec/join/vjoin_node_base.cpp +++ b/be/src/vec/exec/join/vjoin_node_base.cpp @@ -95,14 +95,18 @@ VJoinNodeBase::VJoinNodeBase(ObjectPool* pool, const TPlanNode& tnode, const Des } if (tnode.__isset.hash_join_node) { - _output_row_desc.reset( - new RowDescriptor(descs, {tnode.hash_join_node.voutput_tuple_id}, {false})); + if (!_output_row_descriptor) { + _output_row_desc.reset( + new RowDescriptor(descs, {tnode.hash_join_node.voutput_tuple_id}, {false})); + } _intermediate_row_desc.reset(new RowDescriptor( descs, tnode.hash_join_node.vintermediate_tuple_id_list, std::vector(tnode.hash_join_node.vintermediate_tuple_id_list.size()))); } else if (tnode.__isset.nested_loop_join_node) { - _output_row_desc.reset( - new RowDescriptor(descs, {tnode.nested_loop_join_node.voutput_tuple_id}, {false})); + if (!_output_row_descriptor) { + _output_row_desc.reset(new RowDescriptor( + descs, {tnode.nested_loop_join_node.voutput_tuple_id}, {false})); + } _intermediate_row_desc.reset(new RowDescriptor( descs, tnode.nested_loop_join_node.vintermediate_tuple_id_list, std::vector(tnode.nested_loop_join_node.vintermediate_tuple_id_list.size()))); @@ -166,6 +170,14 @@ void VJoinNodeBase::_construct_mutable_join_block() { Status VJoinNodeBase::_build_output_block(Block* origin_block, Block* output_block, bool keep_origin) { SCOPED_TIMER(_build_output_block_timer); + if (!_projections.empty()) { + *output_block = *origin_block; + if (_is_outer_join) { + DCHECK(output_block->columns() >= 2); + output_block->erase_tail(output_block->columns() - 2); + } + return Status::OK(); + } auto is_mem_reuse = output_block->mem_reuse(); MutableBlock mutable_block = is_mem_reuse diff --git a/be/src/vec/exec/join/vjoin_node_base.h b/be/src/vec/exec/join/vjoin_node_base.h index 0e6ac3c9837db8..449d58b6fb14da 100644 --- a/be/src/vec/exec/join/vjoin_node_base.h +++ b/be/src/vec/exec/join/vjoin_node_base.h @@ -63,7 +63,12 @@ class VJoinNodeBase : public ExecNode { Status open(RuntimeState* state) override; - const RowDescriptor& row_desc() const override { return *_output_row_desc; } + const RowDescriptor& row_desc() const override { + if (_output_row_descriptor) { + return *_output_row_descriptor; + } + return *_output_row_desc; + } const RowDescriptor& intermediate_row_desc() const override { return *_intermediate_row_desc; } diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp b/be/src/vec/exec/join/vnested_loop_join_node.cpp index 3548680bf4998e..77eef4d4da5a00 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.cpp +++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp @@ -102,6 +102,7 @@ Status VNestedLoopJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { if (tnode.nested_loop_join_node.__isset.is_output_left_side_only) { _is_output_left_side_only = tnode.nested_loop_join_node.is_output_left_side_only; + _keep_origin = _is_output_left_side_only; } if (tnode.nested_loop_join_node.__isset.join_conjuncts && diff --git a/be/src/vec/exec/join/vnested_loop_join_node.h b/be/src/vec/exec/join/vnested_loop_join_node.h index fd31b651bd3a13..18bc901222f3fb 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.h +++ b/be/src/vec/exec/join/vnested_loop_join_node.h @@ -92,7 +92,7 @@ class VNestedLoopJoinNode final : public VJoinNodeBase { const RowDescriptor& row_desc() const override { return _old_version_flag ? (_output_row_descriptor ? *_output_row_descriptor : _row_descriptor) - : *_output_row_desc; + : (_output_row_descriptor ? *_output_row_descriptor : *_output_row_desc); } std::shared_ptr get_left_block() { return _left_block; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index d7722e090d0240..499d1571d39c33 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -222,6 +222,7 @@ private void exec(ConnectContext ctx, Set refreshPartitionIds, executor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, ctx.getStatementContext())); ctx.setExecutor(executor); ctx.setQueryId(queryId); + ctx.getState().setNereids(true); command.run(ctx, executor); if (ctx.getState().getStateType() != MysqlStateType.OK) { throw new JobException(ctx.getState().getErrorMessage()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 6b02d3e7d3406b..099e2c0e23e741 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -1539,8 +1539,8 @@ public PlanFragment visitPhysicalHashJoin( TupleDescriptor outputDescriptor = context.generateTupleDesc(); outputSlotReferences.forEach(s -> context.createSlotDesc(outputDescriptor, s)); - hashJoinNode.setvOutputTupleDesc(outputDescriptor); - hashJoinNode.setvSrcToOutputSMap(srcToOutput); + hashJoinNode.setOutputTupleDesc(outputDescriptor); + hashJoinNode.setProjectList(srcToOutput); } if (hashJoin.getStats() != null) { hashJoinNode.setCardinality((long) hashJoin.getStats().getRowCount()); @@ -1720,8 +1720,8 @@ public PlanFragment visitPhysicalNestedLoopJoin( TupleDescriptor outputDescriptor = context.generateTupleDesc(); outputSlotReferences.forEach(s -> context.createSlotDesc(outputDescriptor, s)); - nestedLoopJoinNode.setvOutputTupleDesc(outputDescriptor); - nestedLoopJoinNode.setvSrcToOutputSMap(srcToOutput); + nestedLoopJoinNode.setOutputTupleDesc(outputDescriptor); + nestedLoopJoinNode.setProjectList(srcToOutput); } if (nestedLoopJoin.getStats() != null) { nestedLoopJoinNode.setCardinality((long) nestedLoopJoin.getStats().getRowCount()); @@ -1847,8 +1847,8 @@ public PlanFragment visitPhysicalProject(PhysicalProject project if (inputPlanNode instanceof JoinNodeBase) { TupleDescriptor tupleDescriptor = generateTupleDesc(slots, null, context); JoinNodeBase joinNode = (JoinNodeBase) inputPlanNode; - joinNode.setvOutputTupleDesc(tupleDescriptor); - joinNode.setvSrcToOutputSMap(projectionExprs); + joinNode.setOutputTupleDesc(tupleDescriptor); + joinNode.setProjectList(projectionExprs); // prune the hashOutputSlotIds if (joinNode instanceof HashJoinNode) { ((HashJoinNode) joinNode).getHashOutputSlotIds().clear(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java index d8cc4a77a0ab07..662e0d153e9c98 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java @@ -38,6 +38,7 @@ import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.nereids.trees.expressions.ExprId; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TEqJoinCondition; import org.apache.doris.thrift.TExplainLevel; @@ -79,10 +80,10 @@ public class HashJoinNode extends JoinNodeBase { private List markJoinConjuncts; private DistributionMode distrMode; - private boolean isColocate = false; //the flag for colocate join + private boolean isColocate = false; // the flag for colocate join private String colocateReason = ""; // if can not do colocate join, set reason here - private Set hashOutputSlotIds = Sets.newHashSet(); //init for nereids + private Set hashOutputSlotIds = Sets.newHashSet(); // init for nereids private Map hashOutputExprSlotIdMap = Maps.newHashMap(); @@ -171,7 +172,7 @@ public HashJoinNode(PlanNodeId id, PlanNode outer, PlanNode inner, JoinOperator nullableTupleIds.addAll(outer.getTupleIds()); } vIntermediateTupleDescList = Lists.newArrayList(intermediateTuple); - vOutputTupleDesc = outputTuple; + this.outputTupleDesc = outputTuple; vSrcToOutputSMap = new ExprSubstitutionMap(srcToOutputList, Collections.emptyList()); } @@ -220,7 +221,7 @@ public HashJoinNode(PlanNodeId id, PlanNode outer, PlanNode inner, JoinOperator nullableTupleIds.addAll(outer.getTupleIds()); } vIntermediateTupleDescList = Lists.newArrayList(intermediateTuple); - vOutputTupleDesc = outputTuple; + this.outputTupleDesc = outputTuple; vSrcToOutputSMap = new ExprSubstitutionMap(srcToOutputList, Collections.emptyList()); } @@ -250,12 +251,15 @@ public void setColocate(boolean colocate, String reason) { } /** - * Calculate the slots output after going through the hash table in the hash join node. - * The most essential difference between 'hashOutputSlots' and 'outputSlots' is that + * Calculate the slots output after going through the hash table in the hash + * join node. + * The most essential difference between 'hashOutputSlots' and 'outputSlots' is + * that * it's output needs to contain other conjunct and conjunct columns. * hash output slots = output slots + conjunct slots + other conjunct slots * For example: - * select b.k1 from test.t1 a right join test.t1 b on a.k1=b.k1 and b.k2>1 where a.k2>1; + * select b.k1 from test.t1 a right join test.t1 b on a.k1=b.k1 and b.k2>1 where + * a.k2>1; * output slots: b.k1 * other conjuncts: a.k2>1 * conjuncts: b.k2>1 @@ -327,18 +331,16 @@ public void init(Analyzer analyzer) throws UserException { ExprSubstitutionMap combinedChildSmap = getCombinedChildWithoutTupleIsNullSmap(); List newEqJoinConjuncts = Expr.substituteList(eqJoinConjuncts, combinedChildSmap, analyzer, false); - eqJoinConjuncts = - newEqJoinConjuncts.stream().map(entity -> { - BinaryPredicate predicate = (BinaryPredicate) entity; - if (predicate.getOp().equals(BinaryPredicate.Operator.EQ_FOR_NULL)) { - Preconditions.checkArgument(predicate.getChildren().size() == 2); - if (!predicate.getChild(0).isNullable() || !predicate.getChild(1).isNullable()) { - predicate.setOp(BinaryPredicate.Operator.EQ); - } - } - return predicate; - } - ).collect(Collectors.toList()); + eqJoinConjuncts = newEqJoinConjuncts.stream().map(entity -> { + BinaryPredicate predicate = (BinaryPredicate) entity; + if (predicate.getOp().equals(BinaryPredicate.Operator.EQ_FOR_NULL)) { + Preconditions.checkArgument(predicate.getChildren().size() == 2); + if (!predicate.getChild(0).isNullable() || !predicate.getChild(1).isNullable()) { + predicate.setOp(BinaryPredicate.Operator.EQ); + } + } + return predicate; + }).collect(Collectors.toList()); otherJoinConjuncts = Expr.substituteList(otherJoinConjuncts, combinedChildSmap, analyzer, false); computeOutputTuple(analyzer); @@ -383,7 +385,8 @@ private EqJoinConjunctScanSlots(Expr eqJoinConjunct, SlotDescriptor lhs, SlotDes this.rhs = rhs; } - // Convenience functions. They return double to avoid excessive casts in callers. + // Convenience functions. They return double to avoid excessive casts in + // callers. public double lhsNdv() { // return the estimated number of rows in this partition (-1 if unknown) return Math.min(lhs.getStats().getNumDistinctValues(), lhsNumRows()); @@ -414,8 +417,10 @@ public TupleId rhsTid() { } /** - * Returns a new EqJoinConjunctScanSlots for the given equi-join conjunct or null if - * the given conjunct is not of the form = or if the underlying + * Returns a new EqJoinConjunctScanSlots for the given equi-join conjunct or + * null if + * the given conjunct is not of the form = or if the + * underlying * table/column of at least one side is missing stats. */ public static EqJoinConjunctScanSlots create(Expr eqJoinConjunct) { @@ -477,7 +482,8 @@ private long getJoinCardinality() { return lhsCard; } - // Collect join conjuncts that are eligible to participate in cardinality estimation. + // Collect join conjuncts that are eligible to participate in cardinality + // estimation. List eqJoinConjunctSlots = new ArrayList<>(); for (Expr eqJoinConjunct : eqJoinConjuncts) { EqJoinConjunctScanSlots slots = EqJoinConjunctScanSlots.create(eqJoinConjunct); @@ -495,10 +501,13 @@ private long getJoinCardinality() { } /** - * Returns the estimated join cardinality of a generic N:M inner or outer join based - * on the given list of equi-join conjunct slots and the join input cardinalities. + * Returns the estimated join cardinality of a generic N:M inner or outer join + * based + * on the given list of equi-join conjunct slots and the join input + * cardinalities. * The returned result is >= 0. - * The list of join conjuncts must be non-empty and the cardinalities must be >= 0. + * The list of join conjuncts must be non-empty and the cardinalities must be >= + * 0. *

* Generic estimation: * cardinality = |child(0)| * |child(1)| / max(NDV(L.c), NDV(R.d)) @@ -517,7 +526,8 @@ private long getGenericJoinCardinality(List eqJoinConju long result = -1; for (EqJoinConjunctScanSlots slots : eqJoinConjunctSlots) { - // Adjust the NDVs on both sides to account for predicates. Intuitively, the NDVs + // Adjust the NDVs on both sides to account for predicates. Intuitively, the + // NDVs // should only decrease. We ignore adjustments that would lead to an increase. double lhsAdjNdv = slots.lhsNdv(); if (slots.lhsNumRows() > lhsCard) { @@ -541,7 +551,6 @@ private long getGenericJoinCardinality(List eqJoinConju return result; } - @Override public void computeStats(Analyzer analyzer) throws UserException { super.computeStats(analyzer); @@ -552,25 +561,28 @@ public void computeStats(Analyzer analyzer) throws UserException { @Override protected void computeOldCardinality() { - // For a join between child(0) and child(1), we look for join conditions "L.c = R.d" + // For a join between child(0) and child(1), we look for join conditions "L.c = + // R.d" // (with L being from child(0) and R from child(1)) and use as the cardinality // estimate the maximum of - // child(0).cardinality * R.cardinality / # distinct values for R.d - // * child(1).cardinality / R.cardinality + // child(0).cardinality * R.cardinality / # distinct values for R.d + // * child(1).cardinality / R.cardinality // across all suitable join conditions, which simplifies to - // child(0).cardinality * child(1).cardinality / # distinct values for R.d + // child(0).cardinality * child(1).cardinality / # distinct values for R.d // The reasoning is that // - each row in child(0) joins with R.cardinality/#DV_R.d rows in R // - each row in R is 'present' in child(1).cardinality / R.cardinality rows in - // child(1) + // child(1) // // This handles the very frequent case of a fact table/dimension table join - // (aka foreign key/primary key join) if the primary key is a single column, with + // (aka foreign key/primary key join) if the primary key is a single column, + // with // possible additional predicates against the dimension table. An example: - // FROM FactTbl F JOIN Customers C D ON (F.cust_id = C.id) ... WHERE C.region = 'US' + // FROM FactTbl F JOIN Customers C D ON (F.cust_id = C.id) ... WHERE C.region = + // 'US' // - if there are 5 regions, the selectivity of "C.region = 'US'" would be 0.2 - // and the output cardinality of the Customers scan would be 0.2 * # rows in - // Customers + // and the output cardinality of the Customers scan would be 0.2 * # rows in + // Customers // - # rows in Customers == # of distinct values for Customers.id // - the output cardinality of the join would be F.cardinality * 0.2 @@ -595,12 +607,12 @@ protected void computeOldCardinality() { } long numDistinct = stats.getNumDistinctValues(); // TODO rownum - //Table rhsTbl = slotDesc.getParent().getTableFamilyGroup().getBaseTable(); + // Table rhsTbl = slotDesc.getParent().getTableFamilyGroup().getBaseTable(); // if (rhsTbl != null && rhsTbl.getNumRows() != -1) { // we can't have more distinct values than rows in the table, even though // the metastore stats may think so // LOG.info( - // "#distinct=" + numDistinct + " #rows=" + Long.toString(rhsTbl.getNumRows())); + // "#distinct=" + numDistinct + " #rows=" + Long.toString(rhsTbl.getNumRows())); // numDistinct = Math.min(numDistinct, rhsTbl.getNumRows()); // } maxNumDistinct = Math.max(maxNumDistinct, numDistinct); @@ -649,7 +661,8 @@ private long getNdv(Expr expr) { /** * Returns the estimated cardinality of a semi join node. * For a left semi join between child(0) and child(1), we look for equality join - * conditions "L.c = R.d" (with L being from child(0) and R from child(1)) and use as + * conditions "L.c = R.d" (with L being from child(0) and R from child(1)) and + * use as * the cardinality estimate the minimum of * |child(0)| * Min(NDV(L.c), NDV(R.d)) / NDV(L.c) * over all suitable join conditions. The reasoning is that: @@ -665,7 +678,8 @@ private long getNdv(Expr expr) { * in child(1) is (NDV(L.c) - NDV(R.d)) / NDV(L.c) * - otherwise, we conservatively use |L| to avoid underestimation *

- * We analogously estimate the cardinality for right semi/anti joins, and treat the + * We analogously estimate the cardinality for right semi/anti joins, and treat + * the * null-aware anti join like a regular anti join */ private long getSemiJoinCardinality() { @@ -752,7 +766,7 @@ public void getMaterializedIds(Analyzer analyzer, List ids) { } } - //nereids only + // nereids only public void addSlotIdToHashOutputSlotIds(SlotId slotId) { hashOutputSlotIds.add(slotId); } @@ -800,20 +814,17 @@ protected void toThrift(TPlanNode msg) { msg.hash_join_node.addToHashOutputSlotIds(slotId.asInt()); } } - if (vSrcToOutputSMap != null) { - for (int i = 0; i < vSrcToOutputSMap.size(); i++) { - // TODO: Enable it after we support new optimizers - // if (ConnectContext.get().getSessionVariable().isEnableNereidsPlanner()) { - // msg.addToProjections(vSrcToOutputSMap.getLhs().get(i).treeToThrift()); - // } else - msg.hash_join_node.addToSrcExprList(vSrcToOutputSMap.getLhs().get(i).treeToThrift()); + if (ConnectContext.get() != null && !ConnectContext.get().getState().isNereids()) { + if (vSrcToOutputSMap != null && vSrcToOutputSMap.getLhs() != null) { + for (int i = 0; i < vSrcToOutputSMap.size(); i++) { + msg.hash_join_node.addToSrcExprList(vSrcToOutputSMap.getLhs().get(i).treeToThrift()); + } + } + if (outputTupleDesc != null) { + msg.hash_join_node.setVoutputTupleId(outputTupleDesc.getId().asInt()); } } - if (vOutputTupleDesc != null) { - msg.hash_join_node.setVoutputTupleId(vOutputTupleDesc.getId().asInt()); - // TODO Enable it after we support new optimizers - // msg.setOutputTupleId(vOutputTupleDesc.getId().asInt()); - } + if (vIntermediateTupleDescList != null) { for (TupleDescriptor tupleDescriptor : vIntermediateTupleDescList) { msg.hash_join_node.addToVintermediateTupleIdList(tupleDescriptor.getId().asInt()); @@ -830,9 +841,9 @@ public String getNodeExplainString(String detailPrefix, TExplainLevel detailLeve } else { distrModeStr = distrMode.toString(); } - StringBuilder output = - new StringBuilder().append(detailPrefix).append("join op: ").append(joinOp.toString()).append("(") - .append(distrModeStr).append(")").append("[").append(colocateReason).append("]\n"); + StringBuilder output = new StringBuilder().append(detailPrefix).append("join op: ").append(joinOp.toString()) + .append("(") + .append(distrModeStr).append(")").append("[").append(colocateReason).append("]\n"); if (detailLevel == TExplainLevel.BRIEF) { output.append(detailPrefix).append( String.format("cardinality=%,d", cardinality)).append("\n"); @@ -862,9 +873,11 @@ public String getNodeExplainString(String detailPrefix, TExplainLevel detailLeve output.append(getRuntimeFilterExplainString(true)); } output.append(detailPrefix).append(String.format("cardinality=%,d", cardinality)).append("\n"); - // todo unify in plan node - if (vOutputTupleDesc != null) { - output.append(detailPrefix).append("vec output tuple id: ").append(vOutputTupleDesc.getId()).append("\n"); + if (outputTupleDesc != null) { + output.append(detailPrefix).append("vec output tuple id: ").append(outputTupleDesc.getId()).append("\n"); + } + if (outputTupleDesc != null) { + output.append(detailPrefix).append("output tuple id: ").append(outputTupleDesc.getId()).append("\n"); } if (vIntermediateTupleDescList != null) { output.append(detailPrefix).append("vIntermediate tuple ids: "); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java b/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java index b635cfda59d992..65f7bca36524b0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java @@ -44,7 +44,6 @@ import org.apache.logging.log4j.Logger; import java.util.ArrayList; -import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -57,7 +56,6 @@ public abstract class JoinNodeBase extends PlanNode { protected final TableRef innerRef; protected final JoinOperator joinOp; protected final boolean isMark; - protected TupleDescriptor vOutputTupleDesc; protected ExprSubstitutionMap vSrcToOutputSMap; protected List vIntermediateTupleDescList; @@ -115,7 +113,7 @@ protected boolean isMaterializedByChild(SlotDescriptor slotDesc, ExprSubstitutio protected void computeOutputTuple(Analyzer analyzer) throws UserException { // 1. create new tuple - vOutputTupleDesc = analyzer.getDescTbl().createTupleDescriptor(); + outputTupleDesc = analyzer.getDescTbl().createTupleDescriptor(); boolean copyLeft = false; boolean copyRight = false; boolean leftNullable = false; @@ -169,7 +167,7 @@ protected void computeOutputTuple(Analyzer analyzer) throws UserException { getChild(0) instanceof JoinNodeBase && analyzer.isOuterJoined(leftTupleDesc.getId()); for (SlotDescriptor leftSlotDesc : leftTupleDesc.getSlots()) { SlotDescriptor outputSlotDesc = - analyzer.getDescTbl().copySlotDescriptor(vOutputTupleDesc, leftSlotDesc); + analyzer.getDescTbl().copySlotDescriptor(outputTupleDesc, leftSlotDesc); if (leftNullable) { outputSlotDesc.setIsNullable(true); leftNullableNumber++; @@ -189,7 +187,7 @@ protected void computeOutputTuple(Analyzer analyzer) throws UserException { getChild(1) instanceof JoinNodeBase && analyzer.isOuterJoined(rightTupleDesc.getId()); for (SlotDescriptor rightSlotDesc : rightTupleDesc.getSlots()) { SlotDescriptor outputSlotDesc = - analyzer.getDescTbl().copySlotDescriptor(vOutputTupleDesc, rightSlotDesc); + analyzer.getDescTbl().copySlotDescriptor(outputTupleDesc, rightSlotDesc); if (rightNullable) { outputSlotDesc.setIsNullable(true); rightNullableNumber++; @@ -206,7 +204,7 @@ protected void computeOutputTuple(Analyzer analyzer) throws UserException { if (isMarkJoin() && analyzer.needPopUpMarkTuple(innerRef)) { SlotDescriptor markSlot = analyzer.getMarkTuple(innerRef).getSlots().get(0); SlotDescriptor outputSlotDesc = - analyzer.getDescTbl().copySlotDescriptor(vOutputTupleDesc, markSlot); + analyzer.getDescTbl().copySlotDescriptor(outputTupleDesc, markSlot); srcTblRefToOutputTupleSmap.put(new SlotRef(markSlot), new SlotRef(outputSlotDesc)); } @@ -224,7 +222,7 @@ protected void computeOutputTuple(Analyzer analyzer) throws UserException { } } - vOutputTupleDesc.computeStatAndMemLayout(); + outputTupleDesc.computeStatAndMemLayout(); // 3. add tupleisnull in null-side Preconditions.checkState(srcTblRefToOutputTupleSmap.getLhs().size() == vSrcToOutputSMap.getLhs().size()); // Condition1: the left child is null-side @@ -265,8 +263,8 @@ protected void computeOutputTuple(Analyzer analyzer) throws UserException { public void initOutputSlotIds(Set requiredSlotIdSet, Analyzer analyzer) { outputSlotIds = Lists.newArrayList(); List outputTupleDescList = Lists.newArrayList(); - if (vOutputTupleDesc != null) { - outputTupleDescList.add(vOutputTupleDesc); + if (outputTupleDesc != null) { + outputTupleDescList.add(outputTupleDesc); } else { for (TupleId tupleId : tupleIds) { outputTupleDescList.add(analyzer.getTupleDesc(tupleId)); @@ -296,13 +294,13 @@ public void initOutputSlotIds(Set requiredSlotIdSet, Analyzer analyzer) @Override public void projectOutputTuple() { - if (vOutputTupleDesc == null) { + if (outputTupleDesc == null) { return; } - if (vOutputTupleDesc.getSlots().size() == outputSlotIds.size()) { + if (outputTupleDesc.getSlots().size() == outputSlotIds.size()) { return; } - Iterator iterator = vOutputTupleDesc.getSlots().iterator(); + Iterator iterator = outputTupleDesc.getSlots().iterator(); while (iterator.hasNext()) { SlotDescriptor slotDescriptor = iterator.next(); boolean keep = false; @@ -318,7 +316,7 @@ public void projectOutputTuple() { vSrcToOutputSMap.removeByRhsExpr(slotRef); } } - vOutputTupleDesc.computeStatAndMemLayout(); + outputTupleDesc.computeStatAndMemLayout(); } protected abstract Pair needToCopyRightAndLeft(); @@ -404,14 +402,14 @@ protected void computeIntermediateTuple(Analyzer analyzer) throws AnalysisExcept // 5. replace tuple is null expr TupleIsNullPredicate.substitueListForTupleIsNull(vSrcToOutputSMap.getLhs(), originTidsToIntermediateTidMap); - Preconditions.checkState(vSrcToOutputSMap.getLhs().size() == vOutputTupleDesc.getSlots().size()); + Preconditions.checkState(vSrcToOutputSMap.getLhs().size() == outputTupleDesc.getSlots().size()); List exprs = vSrcToOutputSMap.getLhs(); - ArrayList slots = vOutputTupleDesc.getSlots(); + ArrayList slots = outputTupleDesc.getSlots(); for (int i = 0; i < slots.size(); i++) { slots.get(i).setIsNullable(exprs.get(i).isNullable()); } vSrcToOutputSMap.reCalculateNullableInfoForSlotInRhs(); - vOutputTupleDesc.computeMemLayout(); + outputTupleDesc.computeMemLayout(); } protected abstract List computeSlotIdsForJoinConjuncts(Analyzer analyzer); @@ -479,7 +477,7 @@ public void init(Analyzer analyzer) throws UserException { /** * If parent wants to get join node tupleids, * it will call this function instead of read properties directly. - * The reason is that the tuple id of vOutputTupleDesc the real output tuple id for join node. + * The reason is that the tuple id of outputTupleDesc the real output tuple id for join node. *

* If you read the properties of @tupleids directly instead of this function, * it reads the input id of the current node. @@ -487,16 +485,16 @@ public void init(Analyzer analyzer) throws UserException { @Override public ArrayList getTupleIds() { Preconditions.checkState(tupleIds != null); - if (vOutputTupleDesc != null) { - return Lists.newArrayList(vOutputTupleDesc.getId()); + if (outputTupleDesc != null) { + return Lists.newArrayList(outputTupleDesc.getId()); } return tupleIds; } @Override public ArrayList getOutputTblRefIds() { - if (vOutputTupleDesc != null) { - return Lists.newArrayList(vOutputTupleDesc.getId()); + if (outputTupleDesc != null) { + return Lists.newArrayList(outputTupleDesc.getId()); } switch (joinOp) { case LEFT_SEMI_JOIN: @@ -513,8 +511,8 @@ public ArrayList getOutputTblRefIds() { @Override public List getOutputTupleIds() { - if (vOutputTupleDesc != null) { - return Lists.newArrayList(vOutputTupleDesc.getId()); + if (outputTupleDesc != null) { + return Lists.newArrayList(outputTupleDesc.getId()); } switch (joinOp) { case LEFT_SEMI_JOIN: @@ -544,13 +542,6 @@ public int getNumInstances() { return Math.max(children.get(0).getNumInstances(), children.get(1).getNumInstances()); } - /** - * Used by nereids. - */ - public void setvOutputTupleDesc(TupleDescriptor vOutputTupleDesc) { - this.vOutputTupleDesc = vOutputTupleDesc; - } - /** * Used by nereids. */ @@ -558,13 +549,6 @@ public void setvIntermediateTupleDescList(List vIntermediateTup this.vIntermediateTupleDescList = vIntermediateTupleDescList; } - /** - * Used by nereids. - */ - public void setvSrcToOutputSMap(List lhs) { - this.vSrcToOutputSMap = new ExprSubstitutionMap(lhs, Collections.emptyList()); - } - public void setOutputSmap(ExprSubstitutionMap smap, Analyzer analyzer) { outputSmap = smap; ExprSubstitutionMap tmpSmap = new ExprSubstitutionMap(Lists.newArrayList(vSrcToOutputSMap.getRhs()), @@ -572,12 +556,12 @@ public void setOutputSmap(ExprSubstitutionMap smap, Analyzer analyzer) { List newRhs = Lists.newArrayList(); boolean bSmapChanged = false; for (Expr rhsExpr : smap.getRhs()) { - if (rhsExpr instanceof SlotRef || !rhsExpr.isBound(vOutputTupleDesc.getId())) { + if (rhsExpr instanceof SlotRef || !rhsExpr.isBound(outputTupleDesc.getId())) { newRhs.add(rhsExpr); } else { // we need do project in the join node // add a new slot for projection result and add the project expr to vSrcToOutputSMap - SlotDescriptor slotDesc = analyzer.addSlotDescriptor(vOutputTupleDesc); + SlotDescriptor slotDesc = analyzer.addSlotDescriptor(outputTupleDesc); slotDesc.initFromExpr(rhsExpr); slotDesc.setIsMaterialized(true); // the project expr is from smap, which use the slots of hash join node's output tuple @@ -594,7 +578,7 @@ public void setOutputSmap(ExprSubstitutionMap smap, Analyzer analyzer) { if (bSmapChanged) { outputSmap.updateRhsExprs(newRhs); - vOutputTupleDesc.computeStatAndMemLayout(); + outputTupleDesc.computeStatAndMemLayout(); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java index 9feac7e79ff7b1..3fbfe31adca8fe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java @@ -28,6 +28,7 @@ import org.apache.doris.analysis.TupleId; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TNestedLoopJoinNode; @@ -48,18 +49,24 @@ public class NestedLoopJoinNode extends JoinNodeBase { private static final Logger LOG = LogManager.getLogger(NestedLoopJoinNode.class); - // If isOutputLeftSideOnly=true, the data from the left table is returned directly without a join operation. - // This is used to optimize `in bitmap`, because bitmap will make a lot of copies when doing Nested Loop Join, + // If isOutputLeftSideOnly=true, the data from the left table is returned + // directly without a join operation. + // This is used to optimize `in bitmap`, because bitmap will make a lot of + // copies when doing Nested Loop Join, // which is very resource intensive. // `in bitmap` has two cases: // 1. select * from tbl1 where k1 in (select bitmap_col from tbl2); - // This will generate a bitmap runtime filter to filter the left table, because the bitmap is an exact filter - // and does not need to be filtered again in the NestedLoopJoinNode, so it returns the left table data directly. + // This will generate a bitmap runtime filter to filter the left table, because + // the bitmap is an exact filter + // and does not need to be filtered again in the NestedLoopJoinNode, so it + // returns the left table data directly. // 2. select * from tbl1 where 1 in (select bitmap_col from tbl2); - // This sql will be rewritten to - // "select * from tbl1 left semi join tbl2 where bitmap_contains(tbl2.bitmap_col, 1);" - // return all data in the left table to parent node when there is data on the build side, and return empty when - // there is no data on the build side. + // This sql will be rewritten to + // "select * from tbl1 left semi join tbl2 where + // bitmap_contains(tbl2.bitmap_col, 1);" + // return all data in the left table to parent node when there is data on the + // build side, and return empty when + // there is no data on the build side. private boolean isOutputLeftSideOnly = false; private List runtimeFilterExpr = Lists.newArrayList(); @@ -127,7 +134,7 @@ public NestedLoopJoinNode(PlanNodeId id, PlanNode outer, PlanNode inner, List