diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 518620ba6b4d3e..4d8867e61aec87 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -114,19 +114,20 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { auto& p = _parent->cast(); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM || + _part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) { + std::random_device rd; + std::mt19937 g(rd()); + shuffle(channels.begin(), channels.end(), g); + } int local_size = 0; for (int i = 0; i < channels.size(); ++i) { RETURN_IF_ERROR(channels[i]->open(state)); if (channels[i]->is_local()) { local_size++; + _last_local_channel_idx = i; } } - if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM || - _part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) { - std::random_device rd; - std::mt19937 g(rd()); - shuffle(channels.begin(), channels.end(), g); - } only_local_exchange = local_size == channels.size(); PUniqueId id; @@ -387,11 +388,17 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block if (local_state.only_local_exchange) { if (!block->empty()) { Status status; + size_t idx = 0; for (auto* channel : local_state.channels) { if (!channel->is_receiver_eof()) { - status = channel->send_local_block(block); + // If this channel is the last, we can move this block to downstream pipeline. + // Otherwise, this block also need to be broadcasted to other channels so should be copied. + DCHECK_GE(local_state._last_local_channel_idx, 0); + status = channel->send_local_block( + block, idx == local_state._last_local_channel_idx); HANDLE_CHANNEL_STATUS(state, channel, status); } + idx++; } } } else { @@ -414,17 +421,23 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block local_state._broadcast_pb_mem_limiter->acquire(*block_holder); + size_t idx = 0; for (auto* channel : local_state.channels) { if (!channel->is_receiver_eof()) { Status status; if (channel->is_local()) { - status = channel->send_local_block(&cur_block); + // If this channel is the last, we can move this block to downstream pipeline. + // Otherwise, this block also need to be broadcasted to other channels so should be copied. + DCHECK_GE(local_state._last_local_channel_idx, 0); + status = channel->send_local_block( + &cur_block, idx == local_state._last_local_channel_idx); } else { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); status = channel->send_broadcast_block(block_holder, eos); } HANDLE_CHANNEL_STATUS(state, channel, status); } + idx++; } cur_block.clear_column_data(); local_state._serializer.get_block()->set_mutable_columns( @@ -439,7 +452,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block if (!current_channel->is_receiver_eof()) { // 2. serialize, send and rollover block if (current_channel->is_local()) { - auto status = current_channel->send_local_block(block); + auto status = current_channel->send_local_block(block, false); HANDLE_CHANNEL_STATUS(state, current_channel, status); } else { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); @@ -525,7 +538,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block if (!current_channel->is_receiver_eof()) { // 2. serialize, send and rollover block if (current_channel->is_local()) { - auto status = current_channel->send_local_block(block); + auto status = current_channel->send_local_block(block, false); HANDLE_CHANNEL_STATUS(state, current_channel, status); } else { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index adf8a3424706d2..c60cefabfa8380 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -202,6 +202,7 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { // for external table sink hash partition std::unique_ptr _partition_function = nullptr; std::atomic _reach_limit = false; + int _last_local_channel_idx = -1; }; class ExchangeSinkOperatorX final : public DataSinkOperatorX { diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index fb2f24ee0e1817..1f24ba122e7e3f 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -210,7 +210,7 @@ Status Channel::send_local_block(Status exec_status, bool eos) { } template -Status Channel::send_local_block(Block* block) { +Status Channel::send_local_block(Block* block, bool can_be_moved) { SCOPED_TIMER(_parent->local_send_timer()); if (_recvr_is_valid()) { if constexpr (!std::is_same_v) { @@ -218,7 +218,7 @@ Status Channel::send_local_block(Block* block) { COUNTER_UPDATE(_parent->local_sent_rows(), block->rows()); COUNTER_UPDATE(_parent->blocks_sent_counter(), 1); } - _local_recvr->add_block(block, _parent->sender_id(), false); + _local_recvr->add_block(block, _parent->sender_id(), can_be_moved); return Status::OK(); } else { return _receiver_status; diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 7c86a62519a851..b046c2efafcddf 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -156,7 +156,7 @@ class Channel { Status send_local_block(Status exec_status, bool eos = false); - Status send_local_block(Block* block); + Status send_local_block(Block* block, bool can_be_moved); // Flush buffered rows and close channel. This function don't wait the response // of close operation, client should call close_wait() to finish channel's close. // We split one close operation into two phases in order to make multiple channels