From 8ead17ea2fb0e6cbc7a2f9766ad60be39fd9dfd3 Mon Sep 17 00:00:00 2001 From: zhangstar333 <2561612514@qq.com> Date: Thu, 31 Aug 2023 18:34:11 +0800 Subject: [PATCH 1/7] [refactor](join) improve join node output when build table rows is 0 --- be/src/vec/exec/join/vhash_join_node.cpp | 40 ++++++++++++++++++++++++ be/src/vec/exec/join/vhash_join_node.h | 6 ++++ be/src/vec/exec/join/vjoin_node_base.h | 8 ++++- 3 files changed, 53 insertions(+), 1 deletion(-) diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 55985d6fed8ef1..6e2d0991c9d06c 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -454,6 +454,46 @@ Status HashJoinNode::pull(doris::RuntimeState* state, vectorized::Block* output_ *eos = true; return Status::OK(); } + //TODO: this short circuit maybe could refactor, no need to check at here. + if (_short_circuit_for_probe_and_additional_data) { + // when build table rows is 0 and not have other_join_conjunct and join type is one of LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN + // we could get the result is probe table + null-column(if need output) + // If we use a short-circuit strategy, should return block directly by add additional null data. + auto block_rows = _probe_block.rows(); + if (_probe_eos) { + *eos = _probe_eos; + return Status::OK(); + } + if (!output_block->mem_reuse()) { + *output_block = VectorizedUtils::create_empty_columnswithtypename(row_desc()); + } + + int column_idx = 0; + //get probe side output column + for (int i = 0; i < _left_output_slot_flags.size(); ++i) { + if (_left_output_slot_flags[i]) { + auto result_column_id = -1; + RETURN_IF_ERROR(_output_expr_ctxs[i]->execute(&_probe_block, &result_column_id)); + auto column = _probe_block.get_by_position(result_column_id).column; + output_block->replace_by_position(column_idx++, std::move(column)); + } + } + _probe_block.clear(); + //create build side null column, if need output + for (int i = 0; i < _right_output_slot_flags.size(); ++i) { + if (_right_output_slot_flags[i]) { + auto column = remove_nullable(_right_table_data_types[i])->create_column(); + column->resize(block_rows); + auto null_map_column = ColumnVector::create(block_rows, 1); + auto nullable_column = + ColumnNullable::create(std::move(column), std::move(null_map_column)); + output_block->replace_by_position(column_idx++, std::move(nullable_column)); + } + } + reached_limit(output_block, eos); + return Status::OK(); + } + _join_block.clear_column_data(); MutableBlock mutable_join_block(&_join_block); diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index 28cf86765d0ea1..f91404ad585474 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -569,6 +569,12 @@ class HashJoinNode final : public VJoinNodeBase { (_build_blocks->empty() && _join_op == TJoinOp::RIGHT_OUTER_JOIN) || (_build_blocks->empty() && _join_op == TJoinOp::RIGHT_SEMI_JOIN) || (_build_blocks->empty() && _join_op == TJoinOp::RIGHT_ANTI_JOIN); + //when build table rows is 0 and not have other_join_conjunct and join type is one of LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN + //we could get the result is probe table + null-column(if need output) + _short_circuit_for_probe_and_additional_data = + (_build_blocks->empty() && !_have_other_join_conjunct) && + (_join_op == TJoinOp::LEFT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN || + _join_op == TJoinOp::LEFT_ANTI_JOIN); } bool _enable_hash_join_early_start_probe(RuntimeState* state) const; diff --git a/be/src/vec/exec/join/vjoin_node_base.h b/be/src/vec/exec/join/vjoin_node_base.h index 9864a275840789..8383ef03174863 100644 --- a/be/src/vec/exec/join/vjoin_node_base.h +++ b/be/src/vec/exec/join/vjoin_node_base.h @@ -97,7 +97,10 @@ class VJoinNodeBase : public ExecNode { // Materialize build relation. For HashJoin, it will build a hash table while a list of build blocks for NLJoin. virtual Status _materialize_build_side(RuntimeState* state) = 0; - virtual void _init_short_circuit_for_probe() { _short_circuit_for_probe = false; } + virtual void _init_short_circuit_for_probe() { + _short_circuit_for_probe = false; + _short_circuit_for_probe_and_additional_data = false; + } TJoinOp::type _join_op; JoinOpVariants _join_op_variants; @@ -124,6 +127,9 @@ class VJoinNodeBase : public ExecNode { // 2. build side rows is empty, Join op is: inner join/right outer join/left semi/right semi/right anti bool _short_circuit_for_probe = false; + // for some join, when build side rows is empty, we could return directly by add some additional null data in probe table. + bool _short_circuit_for_probe_and_additional_data = false; + std::unique_ptr _output_row_desc; std::unique_ptr _intermediate_row_desc; // output expr From 0c7c9d355136743aaddc99c4aedf0ca963a534d0 Mon Sep 17 00:00:00 2001 From: zhangstar333 <2561612514@qq.com> Date: Fri, 1 Sep 2023 11:55:45 +0800 Subject: [PATCH 2/7] update review --- be/src/vec/exec/join/vhash_join_node.cpp | 5 +++++ be/src/vec/exec/join/vhash_join_node.h | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 6e2d0991c9d06c..d8c08a79d7a36e 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -490,6 +490,11 @@ Status HashJoinNode::pull(doris::RuntimeState* state, vectorized::Block* output_ output_block->replace_by_position(column_idx++, std::move(nullable_column)); } } + { + SCOPED_TIMER(_join_filter_timer); + RETURN_IF_ERROR( + VExprContext::filter_block(_conjuncts, output_block, output_block->columns())); + } reached_limit(output_block, eos); return Status::OK(); } diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index f91404ad585474..ed7c67c4497962 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -572,7 +572,7 @@ class HashJoinNode final : public VJoinNodeBase { //when build table rows is 0 and not have other_join_conjunct and join type is one of LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN //we could get the result is probe table + null-column(if need output) _short_circuit_for_probe_and_additional_data = - (_build_blocks->empty() && !_have_other_join_conjunct) && + (_build_blocks->empty() && !_have_other_join_conjunct && !_is_mark_join) && (_join_op == TJoinOp::LEFT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN || _join_op == TJoinOp::LEFT_ANTI_JOIN); } From 4041806bc65f7faf4ebdeb3887774b451a8fdf40 Mon Sep 17 00:00:00 2001 From: zhangstar333 <2561612514@qq.com> Date: Fri, 1 Sep 2023 18:45:14 +0800 Subject: [PATCH 3/7] update --- be/src/vec/exec/join/vhash_join_node.cpp | 34 +++++++++++++----------- be/src/vec/exec/join/vhash_join_node.h | 3 ++- be/src/vec/exec/join/vjoin_node_base.h | 1 - 3 files changed, 20 insertions(+), 18 deletions(-) diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index d8c08a79d7a36e..01cdf31c40e5d0 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -464,41 +464,41 @@ Status HashJoinNode::pull(doris::RuntimeState* state, vectorized::Block* output_ *eos = _probe_eos; return Status::OK(); } - if (!output_block->mem_reuse()) { - *output_block = VectorizedUtils::create_empty_columnswithtypename(row_desc()); - } - int column_idx = 0; + Block temp_block; //get probe side output column for (int i = 0; i < _left_output_slot_flags.size(); ++i) { if (_left_output_slot_flags[i]) { - auto result_column_id = -1; - RETURN_IF_ERROR(_output_expr_ctxs[i]->execute(&_probe_block, &result_column_id)); - auto column = _probe_block.get_by_position(result_column_id).column; - output_block->replace_by_position(column_idx++, std::move(column)); + temp_block.insert(_probe_block.get_by_position(i)); } } - _probe_block.clear(); + //create build side null column, if need output - for (int i = 0; i < _right_output_slot_flags.size(); ++i) { + for (int i = 0; + (_join_op != TJoinOp::LEFT_ANTI_JOIN) && i < _right_output_slot_flags.size(); ++i) { if (_right_output_slot_flags[i]) { - auto column = remove_nullable(_right_table_data_types[i])->create_column(); + auto type = remove_nullable(_right_table_data_types[i]); + auto column = type->create_column(); column->resize(block_rows); auto null_map_column = ColumnVector::create(block_rows, 1); auto nullable_column = ColumnNullable::create(std::move(column), std::move(null_map_column)); - output_block->replace_by_position(column_idx++, std::move(nullable_column)); + temp_block.insert( + {std::move(nullable_column), make_nullable(type), "right-null-column"}); } } + { SCOPED_TIMER(_join_filter_timer); RETURN_IF_ERROR( - VExprContext::filter_block(_conjuncts, output_block, output_block->columns())); + VExprContext::filter_block(_conjuncts, &temp_block, temp_block.columns())); } + + RETURN_IF_ERROR(_build_output_block(&temp_block, output_block, false)); + _probe_block.clear(); reached_limit(output_block, eos); return Status::OK(); } - _join_block.clear_column_data(); MutableBlock mutable_join_block(&_join_block); @@ -573,6 +573,7 @@ Status HashJoinNode::pull(doris::RuntimeState* state, vectorized::Block* output_ if (!st) { return st; } + LOG(INFO) << " 44444 " << temp_block.dump_data(); if (_is_outer_join) { _add_tuple_is_null_column(&temp_block); } @@ -586,8 +587,9 @@ Status HashJoinNode::pull(doris::RuntimeState* state, vectorized::Block* output_ // Here make _join_block release the columns' ptr _join_block.set_columns(_join_block.clone_empty_columns()); mutable_join_block.clear(); - + LOG(INFO) << " 555555 " << temp_block.dump_data(); RETURN_IF_ERROR(_build_output_block(&temp_block, output_block, false)); + LOG(INFO) << " 666666 " << output_block->dump_data(); _reset_tuple_is_null_column(); reached_limit(output_block, eos); return Status::OK(); @@ -666,7 +668,7 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo ExecNode::get_next, _children[0], std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))); - + LOG(INFO) << "need_more_input_data() push _probe_eos " << _probe_eos; RETURN_IF_ERROR(push(state, &_probe_block, _probe_eos)); } diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index ed7c67c4497962..334e898b39b3d1 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -569,7 +569,8 @@ class HashJoinNode final : public VJoinNodeBase { (_build_blocks->empty() && _join_op == TJoinOp::RIGHT_OUTER_JOIN) || (_build_blocks->empty() && _join_op == TJoinOp::RIGHT_SEMI_JOIN) || (_build_blocks->empty() && _join_op == TJoinOp::RIGHT_ANTI_JOIN); - //when build table rows is 0 and not have other_join_conjunct and join type is one of LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN + + //when build table rows is 0 and not have other_join_conjunct and not _is_mark_join and join type is one of LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN //we could get the result is probe table + null-column(if need output) _short_circuit_for_probe_and_additional_data = (_build_blocks->empty() && !_have_other_join_conjunct && !_is_mark_join) && diff --git a/be/src/vec/exec/join/vjoin_node_base.h b/be/src/vec/exec/join/vjoin_node_base.h index 8383ef03174863..234374e3c0e09b 100644 --- a/be/src/vec/exec/join/vjoin_node_base.h +++ b/be/src/vec/exec/join/vjoin_node_base.h @@ -129,7 +129,6 @@ class VJoinNodeBase : public ExecNode { // for some join, when build side rows is empty, we could return directly by add some additional null data in probe table. bool _short_circuit_for_probe_and_additional_data = false; - std::unique_ptr _output_row_desc; std::unique_ptr _intermediate_row_desc; // output expr From 97677cd01bba365fe992b3c1948d8ed2ecb59f00 Mon Sep 17 00:00:00 2001 From: zhangstar333 <2561612514@qq.com> Date: Fri, 1 Sep 2023 19:23:40 +0800 Subject: [PATCH 4/7] update2 --- be/src/vec/exec/join/vhash_join_node.cpp | 29 ++++++++++-------------- be/src/vec/exec/join/vhash_join_node.h | 1 + be/src/vec/utils/util.hpp | 10 ++++++++ 3 files changed, 23 insertions(+), 17 deletions(-) diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 01cdf31c40e5d0..c8b2624d9bdd0c 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -402,7 +402,7 @@ Status HashJoinNode::prepare(RuntimeState* state) { // right table data types _right_table_data_types = VectorizedUtils::get_data_types(child(1)->row_desc()); _left_table_data_types = VectorizedUtils::get_data_types(child(0)->row_desc()); - + _right_table_column_names = VectorizedUtils::get_column_names(child(1)->row_desc()); // Hash Table Init _hash_table_init(state); _construct_mutable_join_block(); @@ -468,24 +468,20 @@ Status HashJoinNode::pull(doris::RuntimeState* state, vectorized::Block* output_ Block temp_block; //get probe side output column for (int i = 0; i < _left_output_slot_flags.size(); ++i) { - if (_left_output_slot_flags[i]) { - temp_block.insert(_probe_block.get_by_position(i)); - } + temp_block.insert(_probe_block.get_by_position(i)); } //create build side null column, if need output for (int i = 0; (_join_op != TJoinOp::LEFT_ANTI_JOIN) && i < _right_output_slot_flags.size(); ++i) { - if (_right_output_slot_flags[i]) { - auto type = remove_nullable(_right_table_data_types[i]); - auto column = type->create_column(); - column->resize(block_rows); - auto null_map_column = ColumnVector::create(block_rows, 1); - auto nullable_column = - ColumnNullable::create(std::move(column), std::move(null_map_column)); - temp_block.insert( - {std::move(nullable_column), make_nullable(type), "right-null-column"}); - } + auto type = remove_nullable(_right_table_data_types[i]); + auto column = type->create_column(); + column->resize(block_rows); + auto null_map_column = ColumnVector::create(block_rows, 1); + auto nullable_column = + ColumnNullable::create(std::move(column), std::move(null_map_column)); + temp_block.insert({std::move(nullable_column), make_nullable(type), + _right_table_column_names[i]}); } { @@ -573,7 +569,7 @@ Status HashJoinNode::pull(doris::RuntimeState* state, vectorized::Block* output_ if (!st) { return st; } - LOG(INFO) << " 44444 " << temp_block.dump_data(); + if (_is_outer_join) { _add_tuple_is_null_column(&temp_block); } @@ -587,9 +583,8 @@ Status HashJoinNode::pull(doris::RuntimeState* state, vectorized::Block* output_ // Here make _join_block release the columns' ptr _join_block.set_columns(_join_block.clone_empty_columns()); mutable_join_block.clear(); - LOG(INFO) << " 555555 " << temp_block.dump_data(); RETURN_IF_ERROR(_build_output_block(&temp_block, output_block, false)); - LOG(INFO) << " 666666 " << output_block->dump_data(); + _reset_tuple_is_null_column(); reached_limit(output_block, eos); return Status::OK(); diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index 334e898b39b3d1..fe5139d4670ffa 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -599,6 +599,7 @@ class HashJoinNode final : public VJoinNodeBase { DataTypes _right_table_data_types; DataTypes _left_table_data_types; + std::vector _right_table_column_names; RuntimeProfile::Counter* _build_table_timer; RuntimeProfile::Counter* _build_expr_call_timer; diff --git a/be/src/vec/utils/util.hpp b/be/src/vec/utils/util.hpp index a57e5c17057ca8..440bbff1538320 100644 --- a/be/src/vec/utils/util.hpp +++ b/be/src/vec/utils/util.hpp @@ -123,6 +123,16 @@ class VectorizedUtils { return data_types; } + static std::vector get_column_names(const RowDescriptor& row_desc) { + std::vector column_names; + for (const auto& tuple_desc : row_desc.tuple_descriptors()) { + for (const auto& slot_desc : tuple_desc->slots()) { + column_names.push_back(slot_desc->col_name()); + } + } + return column_names; + } + static bool all_arguments_are_constant(const Block& block, const ColumnNumbers& args) { for (const auto& arg : args) { if (!is_column_const(*block.get_by_position(arg).column)) { From c4eb6970296351c6e54af1551db6d4889840354b Mon Sep 17 00:00:00 2001 From: zhangstar333 <2561612514@qq.com> Date: Fri, 1 Sep 2023 19:25:26 +0800 Subject: [PATCH 5/7] remove log --- be/src/vec/exec/join/vhash_join_node.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index c8b2624d9bdd0c..13bba7ee8072bd 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -663,7 +663,7 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo ExecNode::get_next, _children[0], std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))); - LOG(INFO) << "need_more_input_data() push _probe_eos " << _probe_eos; + RETURN_IF_ERROR(push(state, &_probe_block, _probe_eos)); } From 5de4cacf1b2d522c0d62a31bbba5af6d93e348fa Mon Sep 17 00:00:00 2001 From: zhangstar333 <2561612514@qq.com> Date: Sat, 2 Sep 2023 16:31:54 +0800 Subject: [PATCH 6/7] fix eos with rows --- be/src/vec/exec/join/vhash_join_node.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 13bba7ee8072bd..d4f3d4b64603c7 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -460,7 +460,7 @@ Status HashJoinNode::pull(doris::RuntimeState* state, vectorized::Block* output_ // we could get the result is probe table + null-column(if need output) // If we use a short-circuit strategy, should return block directly by add additional null data. auto block_rows = _probe_block.rows(); - if (_probe_eos) { + if (_probe_eos && block_rows == 0) { *eos = _probe_eos; return Status::OK(); } From 52b6cf28e62d568e4ce60c87df9c9f13d61e3e86 Mon Sep 17 00:00:00 2001 From: zhangstar333 <2561612514@qq.com> Date: Sat, 2 Sep 2023 20:38:52 +0800 Subject: [PATCH 7/7] fix clear block --- be/src/vec/exec/join/vhash_join_node.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index d4f3d4b64603c7..40258339044b5b 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -491,7 +491,8 @@ Status HashJoinNode::pull(doris::RuntimeState* state, vectorized::Block* output_ } RETURN_IF_ERROR(_build_output_block(&temp_block, output_block, false)); - _probe_block.clear(); + temp_block.clear(); + release_block_memory(_probe_block); reached_limit(output_block, eos); return Status::OK(); }