Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions be/src/exec/exec_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,24 @@ std::string ExecNode::get_name() {
Status ExecNode::do_projections(vectorized::Block* origin_block, vectorized::Block* output_block) {
SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_projection_timer);
auto insert_column_datas = [&](auto& to, vectorized::ColumnPtr& from, size_t rows) {
if (to->is_nullable() && !from->is_nullable()) {
if (_keep_origin || !from->is_exclusive()) {
auto& null_column = reinterpret_cast<vectorized::ColumnNullable&>(*to);
null_column.get_nested_column().insert_range_from(*from, 0, rows);
null_column.get_null_map_column().get_data().resize_fill(rows, 0);
} else {
to = make_nullable(from, false)->assume_mutable();
}
} else {
if (_keep_origin || !from->is_exclusive()) {
to->insert_range_from(*from, 0, rows);
} else {
to = from->assume_mutable();
}
}
};

using namespace vectorized;
MutableBlock mutable_block =
VectorizedUtils::build_mutable_mem_reuse_block(output_block, *_output_row_descriptor);
Expand All @@ -535,13 +553,7 @@ Status ExecNode::do_projections(vectorized::Block* origin_block, vectorized::Blo
auto column_ptr = origin_block->get_by_position(result_column_id)
.column->convert_to_full_column_if_const();
//TODO: this is a quick fix, we need a new function like "change_to_nullable" to do it
if (mutable_columns[i]->is_nullable() xor column_ptr->is_nullable()) {
DCHECK(mutable_columns[i]->is_nullable() && !column_ptr->is_nullable());
reinterpret_cast<ColumnNullable*>(mutable_columns[i].get())
->insert_range_from_not_nullable(*column_ptr, 0, rows);
} else {
mutable_columns[i]->insert_range_from(*column_ptr, 0, rows);
}
insert_column_datas(mutable_columns[i], column_ptr, rows);
}
DCHECK(mutable_block.rows() == rows);
output_block->set_columns(std::move(mutable_columns));
Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/exec_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,8 @@ class ExecNode {

std::shared_ptr<QueryStatistics> _query_statistics = nullptr;

bool _keep_origin = false;

private:
static Status create_tree_helper(RuntimeState* state, ObjectPool* pool,
const std::vector<TPlanNode>& tnodes,
Expand Down
80 changes: 74 additions & 6 deletions be/src/pipeline/exec/join_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@

#include "join_probe_operator.h"

#include <memory>

#include "common/exception.h"
#include "common/logging.h"
#include "common/status.h"
#include "pipeline/exec/hashjoin_probe_operator.h"
#include "pipeline/exec/nested_loop_join_probe_operator.h"
#include "pipeline/exec/operator.h"
Expand Down Expand Up @@ -82,6 +87,14 @@ template <typename SharedStateArg, typename Derived>
Status JoinProbeLocalState<SharedStateArg, Derived>::_build_output_block(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function '_build_output_block' has cognitive complexity of 55 (threshold 50) [readability-function-cognitive-complexity]

Status JoinProbeLocalState<SharedStateArg, Derived>::_build_output_block(
                                                     ^
Additional context

be/src/pipeline/exec/join_probe_operator.cpp:89: +1, including nesting penalty of 0, nesting level increased to 1

    if (!Base::_projections.empty()) {
    ^

be/src/pipeline/exec/join_probe_operator.cpp:91: +2, including nesting penalty of 1, nesting level increased to 2

        if (p._is_outer_join) {
        ^

be/src/pipeline/exec/join_probe_operator.cpp:100: +1, including nesting penalty of 0, nesting level increased to 1

            is_mem_reuse ? vectorized::MutableBlock(output_block)
                         ^

be/src/pipeline/exec/join_probe_operator.cpp:108: nesting level increased to 1

    auto insert_column_datas = [&](auto& to, vectorized::ColumnPtr& from, size_t rows) {
                               ^

be/src/pipeline/exec/join_probe_operator.cpp:109: +2, including nesting penalty of 1, nesting level increased to 2

        if (to->is_nullable() && !from->is_nullable()) {
        ^

be/src/pipeline/exec/join_probe_operator.cpp:109: +1

        if (to->is_nullable() && !from->is_nullable()) {
                              ^

be/src/pipeline/exec/join_probe_operator.cpp:110: +3, including nesting penalty of 2, nesting level increased to 3

            if (keep_origin || !from->is_exclusive()) {
            ^

be/src/pipeline/exec/join_probe_operator.cpp:110: +1

            if (keep_origin || !from->is_exclusive()) {
                            ^

be/src/pipeline/exec/join_probe_operator.cpp:114: +1, nesting level increased to 3

            } else {
              ^

be/src/pipeline/exec/join_probe_operator.cpp:117: +1, nesting level increased to 2

        } else {
          ^

be/src/pipeline/exec/join_probe_operator.cpp:118: +3, including nesting penalty of 2, nesting level increased to 3

            if (keep_origin || !from->is_exclusive()) {
            ^

be/src/pipeline/exec/join_probe_operator.cpp:118: +1

            if (keep_origin || !from->is_exclusive()) {
                            ^

be/src/pipeline/exec/join_probe_operator.cpp:120: +1, nesting level increased to 3

            } else {
              ^

be/src/pipeline/exec/join_probe_operator.cpp:125: +2, including nesting penalty of 1, nesting level increased to 2

        if (to->is_nullable()) {
        ^

be/src/pipeline/exec/join_probe_operator.cpp:128: +3, including nesting penalty of 2, nesting level increased to 3

            for (int i = 0; i < null_column.size(); i++) {
            ^

be/src/pipeline/exec/join_probe_operator.cpp:129: +4, including nesting penalty of 3, nesting level increased to 4

                if (!null_column.is_null_at(i)) {
                ^

be/src/pipeline/exec/join_probe_operator.cpp:139: nesting level increased to 1

    auto print = [](vectorized::VExprContextSPtrs& ctxs) {
                 ^

be/src/pipeline/exec/join_probe_operator.cpp:147: nesting level increased to 1

    auto print_desc = [](std::unique_ptr<RowDescriptor>& desc) {
                      ^

be/src/pipeline/exec/join_probe_operator.cpp:149: +2, including nesting penalty of 1, nesting level increased to 2

        if (!desc) {
        ^

be/src/pipeline/exec/join_probe_operator.cpp:151: +1, nesting level increased to 2

        } else {
          ^

be/src/pipeline/exec/join_probe_operator.cpp:166: +1, including nesting penalty of 0, nesting level increased to 1

    if (rows != 0) {
    ^

be/src/pipeline/exec/join_probe_operator.cpp:168: +2, including nesting penalty of 1, nesting level increased to 2

        if (_output_expr_ctxs.empty()) {
        ^

be/src/pipeline/exec/join_probe_operator.cpp:171: +3, including nesting penalty of 2, nesting level increased to 3

            for (int i = 0; i < mutable_columns.size(); ++i) {
            ^

be/src/pipeline/exec/join_probe_operator.cpp:175: +1, nesting level increased to 2

        } else {
          ^

be/src/pipeline/exec/join_probe_operator.cpp:179: +3, including nesting penalty of 2, nesting level increased to 3

            for (int i = 0; i < mutable_columns.size(); ++i) {
            ^

be/src/pipeline/exec/join_probe_operator.cpp:181: +4, including nesting penalty of 3, nesting level increased to 4

                RETURN_IF_ERROR(_output_expr_ctxs[i]->execute(origin_block, &result_column_id));
                ^

be/src/common/status.h:541: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/pipeline/exec/join_probe_operator.cpp:181: +5, including nesting penalty of 4, nesting level increased to 5

                RETURN_IF_ERROR(_output_expr_ctxs[i]->execute(origin_block, &result_column_id));
                ^

be/src/common/status.h:543: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/pipeline/exec/join_probe_operator.cpp:189: +4, including nesting penalty of 3, nesting level increased to 4

                if (is_column_const(*origin_column) ||
                ^

be/src/pipeline/exec/join_probe_operator.cpp:189: +1

                if (is_column_const(*origin_column) ||
                                                    ^

be/src/pipeline/exec/join_probe_operator.cpp:193: +1, nesting level increased to 4

                } else {
                  ^

vectorized::Block* origin_block, vectorized::Block* output_block, bool keep_origin) {
auto& p = Base::_parent->template cast<typename Derived::Parent>();
if (!Base::_projections.empty()) {
*output_block = *origin_block;
if (p._is_outer_join) {
DCHECK(output_block->columns() >= 2);
output_block->erase_tail(output_block->columns() - 2);
}
return Status::OK();
}
SCOPED_TIMER(_build_output_block_timer);
auto is_mem_reuse = output_block->mem_reuse();
vectorized::MutableBlock mutable_block =
Expand All @@ -93,10 +106,10 @@ Status JoinProbeLocalState<SharedStateArg, Derived>::_build_output_block(
// TODO: After FE plan support same nullable of output expr and origin block and mutable column
// we should replace `insert_column_datas` by `insert_range_from`

auto insert_column_datas = [keep_origin](auto& to, vectorized::ColumnPtr& from, size_t rows) {
auto insert_column_datas = [&](auto& to, vectorized::ColumnPtr& from, size_t rows) {
if (to->is_nullable() && !from->is_nullable()) {
if (keep_origin || !from->is_exclusive()) {
auto& null_column = reinterpret_cast<vectorized::ColumnNullable&>(*to);
auto& null_column = assert_cast<vectorized::ColumnNullable&>(*to);
null_column.get_nested_column().insert_range_from(*from, 0, rows);
null_column.get_null_map_column().get_data().resize_fill(rows, 0);
} else {
Expand All @@ -109,7 +122,48 @@ Status JoinProbeLocalState<SharedStateArg, Derived>::_build_output_block(
to = from->assume_mutable();
}
}

if (to->is_nullable()) {
auto& null_column = assert_cast<vectorized::ColumnNullable&>(*to);
bool all_null = true;
for (int i = 0; i < null_column.size(); i++) {
if (!null_column.is_null_at(i)) {
all_null = false;
}
}
LOG_WARNING("yxc test null").tag("null node id", p._node_id).tag("all null", all_null);
}
};

LOG_WARNING("yxc test empty copy");

auto print = [](vectorized::VExprContextSPtrs& ctxs) {
std::string s = "[ ";
for (auto& ctx : ctxs) {
s += ctx->root()->debug_string() + "\t\t\t";
}
return s + " ]";
};

auto print_desc = [](std::unique_ptr<RowDescriptor>& desc) {
std::string ret;
if (!desc) {
ret = "is null";
} else {
ret = desc->debug_string();
}
return ret;
};

LOG_WARNING("yxc test ")
.tag("node id", p._node_id)
.tag("Base::_projections", print(Base::_projections))
.tag("_output_expr_ctxs", print(_output_expr_ctxs))
.tag("output_block", mutable_block.dump_names() + " " + mutable_block.dump_types())
.tag("orgin block ", origin_block->dump_structure())
.tag("_intermediate_row_desc", print_desc(p._intermediate_row_desc))
.tag("plan node", print_desc(p._output_row_descriptor))
.tag("join node", print_desc(p._output_row_desc));
if (rows != 0) {
auto& mutable_columns = mutable_block.mutable_columns();
if (_output_expr_ctxs.empty()) {
Expand Down Expand Up @@ -197,18 +251,32 @@ JoinProbeOperatorX<LocalStateType>::JoinProbeOperatorX(ObjectPool* pool, const T
_intermediate_row_desc.reset(new RowDescriptor(
descs, tnode.hash_join_node.vintermediate_tuple_id_list,
std::vector<bool>(tnode.hash_join_node.vintermediate_tuple_id_list.size())));
_output_row_desc.reset(
new RowDescriptor(descs, {tnode.hash_join_node.voutput_tuple_id}, {false}));
if (!Base::_output_row_descriptor) {
_output_row_desc.reset(
new RowDescriptor(descs, {tnode.hash_join_node.voutput_tuple_id}, {false}));
} else {
if (tnode.hash_join_node.__isset.voutput_tuple_id) {
throw Exception {ErrorCode::INTERNAL_ERROR,
"yxc test error .__isset.voutput_tuple_id"};
}
}

} else if (tnode.__isset.nested_loop_join_node) {
_intermediate_row_desc.reset(new RowDescriptor(
descs, tnode.nested_loop_join_node.vintermediate_tuple_id_list,
std::vector<bool>(tnode.nested_loop_join_node.vintermediate_tuple_id_list.size())));
_output_row_desc.reset(
new RowDescriptor(descs, {tnode.nested_loop_join_node.voutput_tuple_id}, {false}));
if (!Base::_output_row_descriptor) {
_output_row_desc.reset(new RowDescriptor(
descs, {tnode.nested_loop_join_node.voutput_tuple_id}, {false}));
}
} else {
// Iff BE has been upgraded and FE has not yet, we should keep origin logics for CROSS JOIN.
DCHECK_EQ(_join_op, TJoinOp::CROSS_JOIN);
}

if (Base::_node_id == 13) {
LOG_WARNING("yxc test");
}
}

template <typename LocalStateType>
Expand Down
7 changes: 6 additions & 1 deletion be/src/pipeline/exec/join_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,12 @@ class JoinProbeOperatorX : public StatefulOperatorX<LocalStateType> {
Status init(const TPlanNode& tnode, RuntimeState* state) override;

Status open(doris::RuntimeState* state) override;
[[nodiscard]] const RowDescriptor& row_desc() const override { return *_output_row_desc; }
[[nodiscard]] const RowDescriptor& row_desc() const override {
if (Base::_output_row_descriptor) {
return *Base::_output_row_descriptor;
}
return *_output_row_desc;
}

[[nodiscard]] const RowDescriptor& intermediate_row_desc() const override {
return *_intermediate_row_desc;
Expand Down
4 changes: 3 additions & 1 deletion be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,9 @@ NestedLoopJoinProbeOperatorX::NestedLoopJoinProbeOperatorX(ObjectPool* pool, con
: JoinProbeOperatorX<NestedLoopJoinProbeLocalState>(pool, tnode, operator_id, descs),
_is_output_left_side_only(tnode.nested_loop_join_node.__isset.is_output_left_side_only &&
tnode.nested_loop_join_node.is_output_left_side_only),
_old_version_flag(!tnode.__isset.nested_loop_join_node) {}
_old_version_flag(!tnode.__isset.nested_loop_join_node) {
_keep_origin = _is_output_left_side_only;
}

Status NestedLoopJoinProbeOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(JoinProbeOperatorX<NestedLoopJoinProbeLocalState>::init(tnode, state));
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/nested_loop_join_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ class NestedLoopJoinProbeOperatorX final
const RowDescriptor& row_desc() const override {
return _old_version_flag
? (_output_row_descriptor ? *_output_row_descriptor : _row_descriptor)
: *_output_row_desc;
: (_output_row_descriptor ? *_output_row_descriptor : *_output_row_desc);
}

bool need_more_input_data(RuntimeState* state) const override;
Expand Down
30 changes: 21 additions & 9 deletions be/src/pipeline/pipeline_x/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,28 @@ void PipelineXLocalStateBase::clear_origin_block() {

Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* origin_block,
vectorized::Block* output_block) const {
auto local_state = state->get_local_state(operator_id());
auto* local_state = state->get_local_state(operator_id());
SCOPED_TIMER(local_state->exec_time_counter());
SCOPED_TIMER(local_state->_projection_timer);

auto insert_column_datas = [&](auto& to, vectorized::ColumnPtr& from, size_t rows) {
if (to->is_nullable() && !from->is_nullable()) {
if (_keep_origin || !from->is_exclusive()) {
auto& null_column = reinterpret_cast<vectorized::ColumnNullable&>(*to);
null_column.get_nested_column().insert_range_from(*from, 0, rows);
null_column.get_null_map_column().get_data().resize_fill(rows, 0);
} else {
to = make_nullable(from, false)->assume_mutable();
}
} else {
if (_keep_origin || !from->is_exclusive()) {
to->insert_range_from(*from, 0, rows);
} else {
to = from->assume_mutable();
}
}
};

using namespace vectorized;
vectorized::MutableBlock mutable_block =
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(output_block,
Expand All @@ -189,14 +208,7 @@ Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* ori
RETURN_IF_ERROR(local_state->_projections[i]->execute(origin_block, &result_column_id));
auto column_ptr = origin_block->get_by_position(result_column_id)
.column->convert_to_full_column_if_const();
//TODO: this is a quick fix, we need a new function like "change_to_nullable" to do it
if (mutable_columns[i]->is_nullable() xor column_ptr->is_nullable()) {
DCHECK(mutable_columns[i]->is_nullable() && !column_ptr->is_nullable());
reinterpret_cast<ColumnNullable*>(mutable_columns[i].get())
->insert_range_from_not_nullable(*column_ptr, 0, rows);
} else {
mutable_columns[i]->insert_range_from(*column_ptr, 0, rows);
}
insert_column_datas(mutable_columns[i], column_ptr, rows);
}
DCHECK(mutable_block.rows() == rows);
output_block->set_columns(std::move(mutable_columns));
Expand Down
2 changes: 2 additions & 0 deletions be/src/pipeline/pipeline_x/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,8 @@ class OperatorXBase : public OperatorBase {
std::string _op_name;
bool _ignore_data_distribution = false;
int _parallel_tasks = 0;

bool _keep_origin = false;
};

template <typename LocalStateType>
Expand Down
20 changes: 16 additions & 4 deletions be/src/vec/exec/join/vjoin_node_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,18 @@ VJoinNodeBase::VJoinNodeBase(ObjectPool* pool, const TPlanNode& tnode, const Des
}

if (tnode.__isset.hash_join_node) {
_output_row_desc.reset(
new RowDescriptor(descs, {tnode.hash_join_node.voutput_tuple_id}, {false}));
if (!_output_row_descriptor) {
_output_row_desc.reset(
new RowDescriptor(descs, {tnode.hash_join_node.voutput_tuple_id}, {false}));
}
_intermediate_row_desc.reset(new RowDescriptor(
descs, tnode.hash_join_node.vintermediate_tuple_id_list,
std::vector<bool>(tnode.hash_join_node.vintermediate_tuple_id_list.size())));
} else if (tnode.__isset.nested_loop_join_node) {
_output_row_desc.reset(
new RowDescriptor(descs, {tnode.nested_loop_join_node.voutput_tuple_id}, {false}));
if (!_output_row_descriptor) {
_output_row_desc.reset(new RowDescriptor(
descs, {tnode.nested_loop_join_node.voutput_tuple_id}, {false}));
}
_intermediate_row_desc.reset(new RowDescriptor(
descs, tnode.nested_loop_join_node.vintermediate_tuple_id_list,
std::vector<bool>(tnode.nested_loop_join_node.vintermediate_tuple_id_list.size())));
Expand Down Expand Up @@ -166,6 +170,14 @@ void VJoinNodeBase::_construct_mutable_join_block() {
Status VJoinNodeBase::_build_output_block(Block* origin_block, Block* output_block,
bool keep_origin) {
SCOPED_TIMER(_build_output_block_timer);
if (!_projections.empty()) {
*output_block = *origin_block;
if (_is_outer_join) {
DCHECK(output_block->columns() >= 2);
output_block->erase_tail(output_block->columns() - 2);
}
return Status::OK();
}
auto is_mem_reuse = output_block->mem_reuse();
MutableBlock mutable_block =
is_mem_reuse
Expand Down
7 changes: 6 additions & 1 deletion be/src/vec/exec/join/vjoin_node_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,12 @@ class VJoinNodeBase : public ExecNode {

Status open(RuntimeState* state) override;

const RowDescriptor& row_desc() const override { return *_output_row_desc; }
const RowDescriptor& row_desc() const override {
if (_output_row_descriptor) {
return *_output_row_descriptor;
}
return *_output_row_desc;
}

const RowDescriptor& intermediate_row_desc() const override { return *_intermediate_row_desc; }

Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/join/vnested_loop_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ Status VNestedLoopJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {

if (tnode.nested_loop_join_node.__isset.is_output_left_side_only) {
_is_output_left_side_only = tnode.nested_loop_join_node.is_output_left_side_only;
_keep_origin = _is_output_left_side_only;
}

if (tnode.nested_loop_join_node.__isset.join_conjuncts &&
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/join/vnested_loop_join_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class VNestedLoopJoinNode final : public VJoinNodeBase {
const RowDescriptor& row_desc() const override {
return _old_version_flag
? (_output_row_descriptor ? *_output_row_descriptor : _row_descriptor)
: *_output_row_desc;
: (_output_row_descriptor ? *_output_row_descriptor : *_output_row_desc);
}

std::shared_ptr<Block> get_left_block() { return _left_block; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ private void exec(ConnectContext ctx, Set<Long> refreshPartitionIds,
executor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, ctx.getStatementContext()));
ctx.setExecutor(executor);
ctx.setQueryId(queryId);
ctx.getState().setNereids(true);
command.run(ctx, executor);
if (ctx.getState().getStateType() != MysqlStateType.OK) {
throw new JobException(ctx.getState().getErrorMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1539,8 +1539,8 @@ public PlanFragment visitPhysicalHashJoin(
TupleDescriptor outputDescriptor = context.generateTupleDesc();
outputSlotReferences.forEach(s -> context.createSlotDesc(outputDescriptor, s));

hashJoinNode.setvOutputTupleDesc(outputDescriptor);
hashJoinNode.setvSrcToOutputSMap(srcToOutput);
hashJoinNode.setOutputTupleDesc(outputDescriptor);
hashJoinNode.setProjectList(srcToOutput);
}
if (hashJoin.getStats() != null) {
hashJoinNode.setCardinality((long) hashJoin.getStats().getRowCount());
Expand Down Expand Up @@ -1720,8 +1720,8 @@ public PlanFragment visitPhysicalNestedLoopJoin(
TupleDescriptor outputDescriptor = context.generateTupleDesc();
outputSlotReferences.forEach(s -> context.createSlotDesc(outputDescriptor, s));

nestedLoopJoinNode.setvOutputTupleDesc(outputDescriptor);
nestedLoopJoinNode.setvSrcToOutputSMap(srcToOutput);
nestedLoopJoinNode.setOutputTupleDesc(outputDescriptor);
nestedLoopJoinNode.setProjectList(srcToOutput);
}
if (nestedLoopJoin.getStats() != null) {
nestedLoopJoinNode.setCardinality((long) nestedLoopJoin.getStats().getRowCount());
Expand Down Expand Up @@ -1847,8 +1847,8 @@ public PlanFragment visitPhysicalProject(PhysicalProject<? extends Plan> project
if (inputPlanNode instanceof JoinNodeBase) {
TupleDescriptor tupleDescriptor = generateTupleDesc(slots, null, context);
JoinNodeBase joinNode = (JoinNodeBase) inputPlanNode;
joinNode.setvOutputTupleDesc(tupleDescriptor);
joinNode.setvSrcToOutputSMap(projectionExprs);
joinNode.setOutputTupleDesc(tupleDescriptor);
joinNode.setProjectList(projectionExprs);
// prune the hashOutputSlotIds
if (joinNode instanceof HashJoinNode) {
((HashJoinNode) joinNode).getHashOutputSlotIds().clear();
Expand Down
Loading