Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions be/src/vec/exec/join/vhash_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ Status HashJoinNode::close(RuntimeState* state) {

bool HashJoinNode::need_more_input_data() const {
return (_probe_block.rows() == 0 || _probe_index == _probe_block.rows()) && !_probe_eos &&
!_short_circuit_for_probe;
(!_short_circuit_for_probe || _is_mark_join);
}

void HashJoinNode::prepare_for_next() {
Expand All @@ -450,10 +450,43 @@ void HashJoinNode::prepare_for_next() {
Status HashJoinNode::pull(doris::RuntimeState* state, vectorized::Block* output_block, bool* eos) {
SCOPED_TIMER(_probe_timer);
if (_short_circuit_for_probe) {
/// If `_short_circuit_for_probe` is true, this indicates no rows
/// match the join condition, and this is 'mark join', so we need to create a column as mark
/// with all rows set to 0.
if (_is_mark_join) {
auto block_rows = _probe_block.rows();
if (block_rows == 0) {
*eos = _probe_eos;
return Status::OK();
}

Block temp_block;
//get probe side output column
for (int i = 0; i < _left_output_slot_flags.size(); ++i) {
if (_left_output_slot_flags[i]) {
temp_block.insert(_probe_block.get_by_position(i));
}
}
auto mark_column = ColumnUInt8::create(block_rows, 0);
temp_block.insert({std::move(mark_column), std::make_shared<DataTypeUInt8>(), ""});

{
SCOPED_TIMER(_join_filter_timer);
RETURN_IF_ERROR(
VExprContext::filter_block(_conjuncts, &temp_block, temp_block.columns()));
}

RETURN_IF_ERROR(_build_output_block(&temp_block, output_block, false));
temp_block.clear();
release_block_memory(_probe_block);
reached_limit(output_block, eos);
return Status::OK();
}
// If we use a short-circuit strategy, should return empty block directly.
*eos = true;
return Status::OK();
}

//TODO: this short circuit maybe could refactor, no need to check at here.
if (_short_circuit_for_probe_and_additional_data) {
// when build table rows is 0 and not have other_join_conjunct and join type is one of LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN
Expand Down Expand Up @@ -639,7 +672,7 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo
return Status::OK();
}

if (_short_circuit_for_probe) {
if (_short_circuit_for_probe && !_is_mark_join) {
// If we use a short-circuit strategy, should return empty block directly.
*eos = true;
return Status::OK();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,15 @@
24 4
3 3

-- !in_subquery_mark_with_order --
1 \N
1 2
1 3
2 4
2 5
3 3
3 4

-- !exists_subquery_with_order --
1 2
1 3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ suite ("sub_query_correlated") {
DROP TABLE IF EXISTS `sub_query_correlated_subquery5`
"""

sql """
DROP TABLE IF EXISTS `sub_query_correlated_subquery6`
"""

sql """
DROP TABLE IF EXISTS `sub_query_correlated_subquery7`
"""

sql """
create table if not exists sub_query_correlated_subquery1
(k1 bigint, k2 bigint)
Expand Down Expand Up @@ -82,6 +90,21 @@ suite ("sub_query_correlated") {
properties('replication_num' = '1')
"""

sql """
create table if not exists sub_query_correlated_subquery6
(k1 bigint, k2 bigint)
duplicate key(k1)
distributed by hash(k2) buckets 1
properties('replication_num' = '1')
"""

sql """
create table if not exists sub_query_correlated_subquery7
(k1 int, k2 varchar(128), k3 bigint, v1 bigint, v2 bigint)
distributed by hash(k2) buckets 1
properties('replication_num' = '1');
"""

sql """
insert into sub_query_correlated_subquery1 values (1,2), (1,3), (2,4), (2,5), (3,3), (3,4), (20,2), (22,3), (24,4)
"""
Expand All @@ -103,6 +126,15 @@ suite ("sub_query_correlated") {
insert into sub_query_correlated_subquery5 values (5,4), (5,2), (8,3), (5,4), (6,7), (8,9)
"""

sql """
insert into sub_query_correlated_subquery6 values (1,null),(null,1),(1,2), (null,2),(1,3), (2,4), (2,5), (3,3), (3,4), (20,2), (22,3), (24,4),(null,null);
"""

sql """
insert into sub_query_correlated_subquery7 values (1,"abc",2,3,4), (1,"abcd",3,3,4), (2,"xyz",2,4,2),
(2,"uvw",3,4,2), (2,"uvw",3,4,2), (3,"abc",4,5,3), (3,"abc",4,5,3), (null,null,null,null,null);
"""

sql "SET enable_fallback_to_original_planner=false"

//------------------Correlated-----------------
Expand Down Expand Up @@ -261,6 +293,10 @@ suite ("sub_query_correlated") {
select * from sub_query_correlated_subquery1 where sub_query_correlated_subquery1.k1 not in (select sub_query_correlated_subquery3.k3 from sub_query_correlated_subquery3 where sub_query_correlated_subquery3.v2 = sub_query_correlated_subquery1.k2 order by k2);
"""

order_qt_in_subquery_mark_with_order """
select * from sub_query_correlated_subquery6 where sub_query_correlated_subquery6.k1 not in (select sub_query_correlated_subquery7.k3 from sub_query_correlated_subquery7 ) or k1 < 10;
"""

order_qt_exists_subquery_with_order """
select * from sub_query_correlated_subquery1 where exists (select sub_query_correlated_subquery3.k3 from sub_query_correlated_subquery3 where sub_query_correlated_subquery3.v2 = sub_query_correlated_subquery1.k2 order by k2);
"""
Expand Down