Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions be/src/vec/exec/join/vhash_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ Status HashJoinNode::pull(doris::RuntimeState* state, vectorized::Block* output_

/// `_has_null_in_build_side` means have null value in build side.
/// `_short_circuit_for_null_in_build_side` means short circuit if has null in build side(e.g. null aware left anti join).
if (_has_null_in_build_side && _short_circuit_for_null_in_build_side && _is_mark_join) {
if (_has_null_in_build_side && _short_circuit_for_null_in_build_side) {
/// We need to create a column as mark with all rows set to NULL.
auto block_rows = _probe_block.rows();
if (block_rows == 0) {
Expand All @@ -563,10 +563,13 @@ Status HashJoinNode::pull(doris::RuntimeState* state, vectorized::Block* output_
for (int i = 0; i < _left_output_slot_flags.size(); ++i) {
temp_block.insert(_probe_block.get_by_position(i));
}
auto mark_column = ColumnNullable::create(ColumnUInt8::create(block_rows, 0),
ColumnUInt8::create(block_rows, 1));
temp_block.insert(
{std::move(mark_column), make_nullable(std::make_shared<DataTypeUInt8>()), ""});

if (_is_mark_join) {
auto mark_column = ColumnNullable::create(ColumnUInt8::create(block_rows, 0),
ColumnUInt8::create(block_rows, 1));
temp_block.insert(
{std::move(mark_column), make_nullable(std::make_shared<DataTypeUInt8>()), ""});
}

{
SCOPED_TIMER(_join_filter_timer);
Expand Down Expand Up @@ -905,6 +908,13 @@ Status HashJoinNode::_materialize_build_side(RuntimeState* state) {
}
RETURN_IF_ERROR(sink(state, &block, eos));
}

// For broadcast join, if `sink` is not called with eos,
// other instances will not be signaled.
if (!eos) {
Block tmp_block;
RETURN_IF_ERROR(sink(state, &tmp_block, true));
}
RETURN_IF_ERROR(child(1)->close(state));
} else {
RETURN_IF_ERROR(child(1)->close(state));
Expand Down