Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 5 additions & 17 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
vectorized::MutableBlock::build_mutable_block(&tmp_build_block);
}

if (in_block->rows() != 0) {
if (!in_block->empty()) {
std::vector<int> res_col_ids(_build_expr_ctxs.size());
RETURN_IF_ERROR(local_state._do_evaluate(*in_block, local_state._build_expr_ctxs,
*local_state._build_expr_call_timer,
Expand All @@ -533,27 +533,15 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*

local_state._mem_tracker->consume(in_block->bytes());
COUNTER_UPDATE(local_state._build_blocks_memory_usage, in_block->bytes());
local_state._build_blocks.emplace_back(std::move(*in_block));

SCOPED_TIMER(local_state._build_side_merge_block_timer);
RETURN_IF_ERROR(local_state._build_side_mutable_block.merge_ignore_overflow(
std::move(*in_block)));
}
}

if (local_state._should_build_hash_table && eos) {
DCHECK(!local_state._build_side_mutable_block.empty());

for (auto& column : local_state._build_side_mutable_block.mutable_columns()) {
column->reserve(local_state._build_side_rows);
}

{
SCOPED_TIMER(local_state._build_side_merge_block_timer);
for (auto& block : local_state._build_blocks) {
RETURN_IF_ERROR(local_state._build_side_mutable_block.merge_ignore_overflow(block));

vectorized::Block temp;
std::swap(block, temp);
}
}

local_state._shared_state->build_block = std::make_shared<vectorized::Block>(
local_state._build_side_mutable_block.to_block());

Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/hashjoin_build_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ class HashJoinBuildSinkLocalState final
int64_t _build_side_last_mem_used = 0;

size_t _build_side_rows = 0;
std::vector<vectorized::Block> _build_blocks;

vectorized::MutableBlock _build_side_mutable_block;
std::shared_ptr<VRuntimeFilterSlots> _runtime_filter_slots;
Expand Down
44 changes: 24 additions & 20 deletions be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,30 +100,33 @@ size_t PartitionedHashJoinSinkLocalState::revocable_mem_size(RuntimeState* state
}

Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeState* state) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function '_revoke_unpartitioned_block' has cognitive complexity of 52 (threshold 50) [readability-function-cognitive-complexity]

Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeState* state) {
                                          ^
Additional context

be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:108: +1, including nesting penalty of 0, nesting level increased to 1

    if (inner_sink_state_) {
    ^

be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:113: +1, including nesting penalty of 0, nesting level increased to 1

    if (build_block.rows() <= 1) {
    ^

be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:119: +1, including nesting penalty of 0, nesting level increased to 1

    if (build_block.columns() > num_slots) {
    ^

be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:123: nesting level increased to 1

    auto spill_func = [build_block = std::move(build_block), state, this]() mutable {
                      ^

be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:130: nesting level increased to 2

                      [](std::vector<uint32_t>& indices) { indices.reserve(reserved_size); });
                      ^

be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:132: nesting level increased to 2

        auto flush_rows = [&state, this](std::unique_ptr<vectorized::MutableBlock>& partition_block,
                          ^

be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:137: +3, including nesting penalty of 2, nesting level increased to 3

            if (!status.ok()) {
            ^

be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:149: +2, including nesting penalty of 1, nesting level increased to 2

        while (offset < total_rows) {
        ^

be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:153: +3, including nesting penalty of 2, nesting level increased to 3

            for (size_t i = 0; i != build_block.columns(); ++i) {
            ^

be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:167: +3, including nesting penalty of 2, nesting level increased to 3

            for (size_t i = 0; i != sub_block.rows(); ++i) {
            ^

be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:171: +3, including nesting penalty of 2, nesting level increased to 3

            for (uint32_t partition_idx = 0; partition_idx != p._partition_count; ++partition_idx) {
            ^

be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:177: +4, including nesting penalty of 3, nesting level increased to 4

                if (UNLIKELY(!partition_block)) {
                ^

be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:185: +4, including nesting penalty of 3, nesting level increased to 4

                    if (!st.ok()) {
                    ^

be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:195: +4, including nesting penalty of 3, nesting level increased to 4

                if (partition_block->rows() >= reserved_size || is_last_block) {
                ^

be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:195: +1

                if (partition_block->rows() >= reserved_size || is_last_block) {
                                                             ^

be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:196: +5, including nesting penalty of 4, nesting level increased to 5

                    if (!flush_rows(partition_block, spilling_stream)) {
                    ^

be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:208: nesting level increased to 1

    auto exception_catch_func = [spill_func, this]() mutable {
                                ^

be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:209: nesting level increased to 2

        auto status = [&]() {
                      ^

be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:210: +3, including nesting penalty of 2, nesting level increased to 3

            RETURN_IF_CATCH_EXCEPTION(spill_func());
            ^

be/src/common/exception.h:89: expanded from macro 'RETURN_IF_CATCH_EXCEPTION'

    do {                                                                                         \
    ^

be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:210: +4, including nesting penalty of 3, nesting level increased to 4

            RETURN_IF_CATCH_EXCEPTION(spill_func());
            ^

be/src/common/exception.h:94: expanded from macro 'RETURN_IF_CATCH_EXCEPTION'

        } catch (const doris::Exception& e) {                                                    \
          ^

be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:210: +5, including nesting penalty of 4, nesting level increased to 5

            RETURN_IF_CATCH_EXCEPTION(spill_func());
            ^

be/src/common/exception.h:95: expanded from macro 'RETURN_IF_CATCH_EXCEPTION'

            if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) {                                \
            ^

be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:214: +2, including nesting penalty of 1, nesting level increased to 2

        if (!status.ok()) {
        ^

be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:228: +1, including nesting penalty of 0, nesting level increased to 1

    DBUG_EXECUTE_IF(
    ^

be/src/util/debug_points.h:36: expanded from macro 'DBUG_EXECUTE_IF'

    if (UNLIKELY(config::enable_debug_points)) {                              \
    ^

be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:228: +2, including nesting penalty of 1, nesting level increased to 2

    DBUG_EXECUTE_IF(
    ^

be/src/util/debug_points.h:38: expanded from macro 'DBUG_EXECUTE_IF'

        if (dp) {                                                             \
        ^

_shared_state->need_to_spill = true;
auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
_shared_state->inner_shared_state->hash_table_variants.reset();
auto row_desc = p._child_x->row_desc();
const auto num_slots = row_desc.num_slots();
std::vector<vectorized::Block> build_blocks;
vectorized::Block build_block;
auto inner_sink_state_ = _shared_state->inner_runtime_state->get_sink_local_state();
if (inner_sink_state_) {
auto inner_sink_state = assert_cast<HashJoinBuildSinkLocalState*>(inner_sink_state_);
build_blocks = std::move(inner_sink_state->_build_blocks);
build_block = inner_sink_state->_build_side_mutable_block.to_block();
}

if (build_blocks.empty()) {
if (build_block.rows() <= 1) {
LOG(WARNING) << "has no data to revoke, node: " << _parent->node_id()
<< ", task: " << state->task_id();
return Status::OK();
}

auto spill_func = [build_blocks = std::move(build_blocks), state, num_slots, this]() mutable {
if (build_block.columns() > num_slots) {
build_block.erase(num_slots);
}

auto spill_func = [build_block = std::move(build_block), state, this]() mutable {
auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
auto& partitioned_blocks = _shared_state->partitioned_build_blocks;
std::vector<std::vector<uint32_t>> partitions_indexes(p._partition_count);

const auto reserved_size = 4096;
const size_t reserved_size = 4096;
std::for_each(partitions_indexes.begin(), partitions_indexes.end(),
[](std::vector<uint32_t>& indices) { indices.reserve(reserved_size); });

Expand All @@ -142,24 +145,27 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
return true;
};

for (size_t block_idx = 0; block_idx != build_blocks.size(); ++block_idx) {
auto& build_block = build_blocks[block_idx];
const auto is_last_block = (block_idx == (build_blocks.size() - 1));
if (UNLIKELY(build_block.empty())) {
continue;
}
size_t total_rows = build_block.rows();
size_t offset = 1;
while (offset < total_rows) {
auto sub_block = build_block.clone_empty();
size_t this_run = std::min(reserved_size, total_rows - offset);

if (build_block.columns() > num_slots) {
build_block.erase(num_slots);
for (size_t i = 0; i != build_block.columns(); ++i) {
sub_block.get_by_position(i).column =
build_block.get_by_position(i).column->cut(offset, this_run);
}

offset += this_run;
const auto is_last_block = offset == total_rows;

{
SCOPED_TIMER(_partition_timer);
(void)_partitioner->do_partitioning(state, &build_block, _mem_tracker.get());
(void)_partitioner->do_partitioning(state, &sub_block, _mem_tracker.get());
}

auto* channel_ids = _partitioner->get_channel_ids().get<uint32_t>();
for (size_t i = 0; i != build_block.rows(); ++i) {
const auto* channel_ids = _partitioner->get_channel_ids().get<uint32_t>();
for (size_t i = 0; i != sub_block.rows(); ++i) {
partitions_indexes[channel_ids[i]].emplace_back(i);
}

Expand All @@ -176,7 +182,7 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta

{
SCOPED_TIMER(_partition_shuffle_timer);
Status st = partition_block->add_rows(&build_block, begin, end);
Status st = partition_block->add_rows(&sub_block, begin, end);
if (!st.ok()) {
std::unique_lock<std::mutex> lock(_spill_lock);
_spill_status = st;
Expand All @@ -195,8 +201,6 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
vectorized::MutableBlock::create_unique(build_block.clone_empty());
}
}

build_block.clear();
}

_dependency->set_ready();
Expand Down