diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 0b9bd48faf1800..7887628b7fa476 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -518,7 +518,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* vectorized::MutableBlock::build_mutable_block(&tmp_build_block); } - if (in_block->rows() != 0) { + if (!in_block->empty()) { std::vector res_col_ids(_build_expr_ctxs.size()); RETURN_IF_ERROR(local_state._do_evaluate(*in_block, local_state._build_expr_ctxs, *local_state._build_expr_call_timer, @@ -533,27 +533,15 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* local_state._mem_tracker->consume(in_block->bytes()); COUNTER_UPDATE(local_state._build_blocks_memory_usage, in_block->bytes()); - local_state._build_blocks.emplace_back(std::move(*in_block)); + + SCOPED_TIMER(local_state._build_side_merge_block_timer); + RETURN_IF_ERROR(local_state._build_side_mutable_block.merge_ignore_overflow( + std::move(*in_block))); } } if (local_state._should_build_hash_table && eos) { DCHECK(!local_state._build_side_mutable_block.empty()); - - for (auto& column : local_state._build_side_mutable_block.mutable_columns()) { - column->reserve(local_state._build_side_rows); - } - - { - SCOPED_TIMER(local_state._build_side_merge_block_timer); - for (auto& block : local_state._build_blocks) { - RETURN_IF_ERROR(local_state._build_side_mutable_block.merge_ignore_overflow(block)); - - vectorized::Block temp; - std::swap(block, temp); - } - } - local_state._shared_state->build_block = std::make_shared( local_state._build_side_mutable_block.to_block()); diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index fad03f0a78d518..f6681548dbef0f 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -78,7 +78,6 @@ class HashJoinBuildSinkLocalState final int64_t _build_side_last_mem_used = 0; size_t _build_side_rows = 0; - std::vector _build_blocks; vectorized::MutableBlock _build_side_mutable_block; std::shared_ptr _runtime_filter_slots; diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index 4aa0bd42a84297..1aeb9213d83ee7 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -100,30 +100,33 @@ size_t PartitionedHashJoinSinkLocalState::revocable_mem_size(RuntimeState* state } Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeState* state) { - _shared_state->need_to_spill = true; auto& p = _parent->cast(); _shared_state->inner_shared_state->hash_table_variants.reset(); auto row_desc = p._child_x->row_desc(); const auto num_slots = row_desc.num_slots(); - std::vector build_blocks; + vectorized::Block build_block; auto inner_sink_state_ = _shared_state->inner_runtime_state->get_sink_local_state(); if (inner_sink_state_) { auto inner_sink_state = assert_cast(inner_sink_state_); - build_blocks = std::move(inner_sink_state->_build_blocks); + build_block = inner_sink_state->_build_side_mutable_block.to_block(); } - if (build_blocks.empty()) { + if (build_block.rows() <= 1) { LOG(WARNING) << "has no data to revoke, node: " << _parent->node_id() << ", task: " << state->task_id(); return Status::OK(); } - auto spill_func = [build_blocks = std::move(build_blocks), state, num_slots, this]() mutable { + if (build_block.columns() > num_slots) { + build_block.erase(num_slots); + } + + auto spill_func = [build_block = std::move(build_block), state, this]() mutable { auto& p = _parent->cast(); auto& partitioned_blocks = _shared_state->partitioned_build_blocks; std::vector> partitions_indexes(p._partition_count); - const auto reserved_size = 4096; + const size_t reserved_size = 4096; std::for_each(partitions_indexes.begin(), partitions_indexes.end(), [](std::vector& indices) { indices.reserve(reserved_size); }); @@ -142,24 +145,27 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta return true; }; - for (size_t block_idx = 0; block_idx != build_blocks.size(); ++block_idx) { - auto& build_block = build_blocks[block_idx]; - const auto is_last_block = (block_idx == (build_blocks.size() - 1)); - if (UNLIKELY(build_block.empty())) { - continue; - } + size_t total_rows = build_block.rows(); + size_t offset = 1; + while (offset < total_rows) { + auto sub_block = build_block.clone_empty(); + size_t this_run = std::min(reserved_size, total_rows - offset); - if (build_block.columns() > num_slots) { - build_block.erase(num_slots); + for (size_t i = 0; i != build_block.columns(); ++i) { + sub_block.get_by_position(i).column = + build_block.get_by_position(i).column->cut(offset, this_run); } + offset += this_run; + const auto is_last_block = offset == total_rows; + { SCOPED_TIMER(_partition_timer); - (void)_partitioner->do_partitioning(state, &build_block, _mem_tracker.get()); + (void)_partitioner->do_partitioning(state, &sub_block, _mem_tracker.get()); } - auto* channel_ids = _partitioner->get_channel_ids().get(); - for (size_t i = 0; i != build_block.rows(); ++i) { + const auto* channel_ids = _partitioner->get_channel_ids().get(); + for (size_t i = 0; i != sub_block.rows(); ++i) { partitions_indexes[channel_ids[i]].emplace_back(i); } @@ -176,7 +182,7 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta { SCOPED_TIMER(_partition_shuffle_timer); - Status st = partition_block->add_rows(&build_block, begin, end); + Status st = partition_block->add_rows(&sub_block, begin, end); if (!st.ok()) { std::unique_lock lock(_spill_lock); _spill_status = st; @@ -195,8 +201,6 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta vectorized::MutableBlock::create_unique(build_block.clone_empty()); } } - - build_block.clear(); } _dependency->set_ready();