Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 45 additions & 2 deletions be/src/vec/exec/join/vhash_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ Status HashJoinNode::prepare(RuntimeState* state) {
// right table data types
_right_table_data_types = VectorizedUtils::get_data_types(child(1)->row_desc());
_left_table_data_types = VectorizedUtils::get_data_types(child(0)->row_desc());

_right_table_column_names = VectorizedUtils::get_column_names(child(1)->row_desc());
// Hash Table Init
_hash_table_init(state);
_construct_mutable_join_block();
Expand Down Expand Up @@ -454,6 +454,48 @@ Status HashJoinNode::pull(doris::RuntimeState* state, vectorized::Block* output_
*eos = true;
return Status::OK();
}
//TODO: this short circuit maybe could refactor, no need to check at here.
if (_short_circuit_for_probe_and_additional_data) {
// when build table rows is 0 and not have other_join_conjunct and join type is one of LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN
// we could get the result is probe table + null-column(if need output)
// If we use a short-circuit strategy, should return block directly by add additional null data.
auto block_rows = _probe_block.rows();
if (_probe_eos && block_rows == 0) {
*eos = _probe_eos;
return Status::OK();
}

Block temp_block;
//get probe side output column
for (int i = 0; i < _left_output_slot_flags.size(); ++i) {
temp_block.insert(_probe_block.get_by_position(i));
}

//create build side null column, if need output
for (int i = 0;
(_join_op != TJoinOp::LEFT_ANTI_JOIN) && i < _right_output_slot_flags.size(); ++i) {
auto type = remove_nullable(_right_table_data_types[i]);
auto column = type->create_column();
column->resize(block_rows);
auto null_map_column = ColumnVector<UInt8>::create(block_rows, 1);
auto nullable_column =
ColumnNullable::create(std::move(column), std::move(null_map_column));
temp_block.insert({std::move(nullable_column), make_nullable(type),
_right_table_column_names[i]});
}

{
SCOPED_TIMER(_join_filter_timer);
RETURN_IF_ERROR(
VExprContext::filter_block(_conjuncts, &temp_block, temp_block.columns()));
}

RETURN_IF_ERROR(_build_output_block(&temp_block, output_block, false));
temp_block.clear();
release_block_memory(_probe_block);
reached_limit(output_block, eos);
return Status::OK();
}
_join_block.clear_column_data();

MutableBlock mutable_join_block(&_join_block);
Expand Down Expand Up @@ -528,6 +570,7 @@ Status HashJoinNode::pull(doris::RuntimeState* state, vectorized::Block* output_
if (!st) {
return st;
}

if (_is_outer_join) {
_add_tuple_is_null_column(&temp_block);
}
Expand All @@ -541,8 +584,8 @@ Status HashJoinNode::pull(doris::RuntimeState* state, vectorized::Block* output_
// Here make _join_block release the columns' ptr
_join_block.set_columns(_join_block.clone_empty_columns());
mutable_join_block.clear();

RETURN_IF_ERROR(_build_output_block(&temp_block, output_block, false));

_reset_tuple_is_null_column();
reached_limit(output_block, eos);
return Status::OK();
Expand Down
8 changes: 8 additions & 0 deletions be/src/vec/exec/join/vhash_join_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,13 @@ class HashJoinNode final : public VJoinNodeBase {
(_build_blocks->empty() && _join_op == TJoinOp::RIGHT_OUTER_JOIN) ||
(_build_blocks->empty() && _join_op == TJoinOp::RIGHT_SEMI_JOIN) ||
(_build_blocks->empty() && _join_op == TJoinOp::RIGHT_ANTI_JOIN);

//when build table rows is 0 and not have other_join_conjunct and not _is_mark_join and join type is one of LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN
//we could get the result is probe table + null-column(if need output)
_short_circuit_for_probe_and_additional_data =
(_build_blocks->empty() && !_have_other_join_conjunct && !_is_mark_join) &&
(_join_op == TJoinOp::LEFT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN ||
_join_op == TJoinOp::LEFT_ANTI_JOIN);
}

bool _enable_hash_join_early_start_probe(RuntimeState* state) const;
Expand All @@ -592,6 +599,7 @@ class HashJoinNode final : public VJoinNodeBase {

DataTypes _right_table_data_types;
DataTypes _left_table_data_types;
std::vector<std::string> _right_table_column_names;

RuntimeProfile::Counter* _build_table_timer;
RuntimeProfile::Counter* _build_expr_call_timer;
Expand Down
7 changes: 6 additions & 1 deletion be/src/vec/exec/join/vjoin_node_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,10 @@ class VJoinNodeBase : public ExecNode {
// Materialize build relation. For HashJoin, it will build a hash table while a list of build blocks for NLJoin.
virtual Status _materialize_build_side(RuntimeState* state) = 0;

virtual void _init_short_circuit_for_probe() { _short_circuit_for_probe = false; }
virtual void _init_short_circuit_for_probe() {
_short_circuit_for_probe = false;
_short_circuit_for_probe_and_additional_data = false;
}

TJoinOp::type _join_op;
JoinOpVariants _join_op_variants;
Expand All @@ -124,6 +127,8 @@ class VJoinNodeBase : public ExecNode {
// 2. build side rows is empty, Join op is: inner join/right outer join/left semi/right semi/right anti
bool _short_circuit_for_probe = false;

// for some join, when build side rows is empty, we could return directly by add some additional null data in probe table.
bool _short_circuit_for_probe_and_additional_data = false;
std::unique_ptr<RowDescriptor> _output_row_desc;
std::unique_ptr<RowDescriptor> _intermediate_row_desc;
// output expr
Expand Down
10 changes: 10 additions & 0 deletions be/src/vec/utils/util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,16 @@ class VectorizedUtils {
return data_types;
}

static std::vector<std::string> get_column_names(const RowDescriptor& row_desc) {
std::vector<std::string> column_names;
for (const auto& tuple_desc : row_desc.tuple_descriptors()) {
for (const auto& slot_desc : tuple_desc->slots()) {
column_names.push_back(slot_desc->col_name());
}
}
return column_names;
}

static bool all_arguments_are_constant(const Block& block, const ColumnNumbers& args) {
for (const auto& arg : args) {
if (!is_column_const(*block.get_by_position(arg).column)) {
Expand Down