From 0827bd06daaeb678b6b4c5e864f0541ef4701809 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 11 Oct 2024 11:21:23 +0800 Subject: [PATCH 1/2] update --- be/src/pipeline/exec/exchange_sink_operator.cpp | 9 +++++---- be/src/vec/sink/vdata_stream_sender.cpp | 4 ++-- be/src/vec/sink/vdata_stream_sender.h | 2 +- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 518620ba6b4d3e..c2247f4c37f155 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -389,7 +389,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block Status status; for (auto* channel : local_state.channels) { if (!channel->is_receiver_eof()) { - status = channel->send_local_block(block); + status = channel->send_local_block(block, local_state.channels.size() != 1); HANDLE_CHANNEL_STATUS(state, channel, status); } } @@ -418,7 +418,8 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block if (!channel->is_receiver_eof()) { Status status; if (channel->is_local()) { - status = channel->send_local_block(&cur_block); + status = channel->send_local_block( + &cur_block, local_state.channels.size() != 1); } else { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); status = channel->send_broadcast_block(block_holder, eos); @@ -439,7 +440,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block if (!current_channel->is_receiver_eof()) { // 2. serialize, send and rollover block if (current_channel->is_local()) { - auto status = current_channel->send_local_block(block); + auto status = current_channel->send_local_block(block, false); HANDLE_CHANNEL_STATUS(state, current_channel, status); } else { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); @@ -525,7 +526,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block if (!current_channel->is_receiver_eof()) { // 2. serialize, send and rollover block if (current_channel->is_local()) { - auto status = current_channel->send_local_block(block); + auto status = current_channel->send_local_block(block, false); HANDLE_CHANNEL_STATUS(state, current_channel, status); } else { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index fb2f24ee0e1817..5e020bc2cac6ef 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -210,7 +210,7 @@ Status Channel::send_local_block(Status exec_status, bool eos) { } template -Status Channel::send_local_block(Block* block) { +Status Channel::send_local_block(Block* block, bool is_broadcast) { SCOPED_TIMER(_parent->local_send_timer()); if (_recvr_is_valid()) { if constexpr (!std::is_same_v) { @@ -218,7 +218,7 @@ Status Channel::send_local_block(Block* block) { COUNTER_UPDATE(_parent->local_sent_rows(), block->rows()); COUNTER_UPDATE(_parent->blocks_sent_counter(), 1); } - _local_recvr->add_block(block, _parent->sender_id(), false); + _local_recvr->add_block(block, _parent->sender_id(), !is_broadcast); return Status::OK(); } else { return _receiver_status; diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 7c86a62519a851..7ea710ed790a2c 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -156,7 +156,7 @@ class Channel { Status send_local_block(Status exec_status, bool eos = false); - Status send_local_block(Block* block); + Status send_local_block(Block* block, bool is_broadcast); // Flush buffered rows and close channel. This function don't wait the response // of close operation, client should call close_wait() to finish channel's close. // We split one close operation into two phases in order to make multiple channels From bd85447159141deb697f2958cef258a369b49b0e Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 11 Oct 2024 11:38:19 +0800 Subject: [PATCH 2/2] update --- .../pipeline/exec/exchange_sink_operator.cpp | 28 +++++++++++++------ be/src/pipeline/exec/exchange_sink_operator.h | 1 + be/src/vec/sink/vdata_stream_sender.cpp | 4 +-- be/src/vec/sink/vdata_stream_sender.h | 2 +- 4 files changed, 24 insertions(+), 11 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index c2247f4c37f155..4d8867e61aec87 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -114,19 +114,20 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { auto& p = _parent->cast(); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM || + _part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) { + std::random_device rd; + std::mt19937 g(rd()); + shuffle(channels.begin(), channels.end(), g); + } int local_size = 0; for (int i = 0; i < channels.size(); ++i) { RETURN_IF_ERROR(channels[i]->open(state)); if (channels[i]->is_local()) { local_size++; + _last_local_channel_idx = i; } } - if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM || - _part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) { - std::random_device rd; - std::mt19937 g(rd()); - shuffle(channels.begin(), channels.end(), g); - } only_local_exchange = local_size == channels.size(); PUniqueId id; @@ -387,11 +388,17 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block if (local_state.only_local_exchange) { if (!block->empty()) { Status status; + size_t idx = 0; for (auto* channel : local_state.channels) { if (!channel->is_receiver_eof()) { - status = channel->send_local_block(block, local_state.channels.size() != 1); + // If this channel is the last, we can move this block to downstream pipeline. + // Otherwise, this block also need to be broadcasted to other channels so should be copied. + DCHECK_GE(local_state._last_local_channel_idx, 0); + status = channel->send_local_block( + block, idx == local_state._last_local_channel_idx); HANDLE_CHANNEL_STATUS(state, channel, status); } + idx++; } } } else { @@ -414,18 +421,23 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block local_state._broadcast_pb_mem_limiter->acquire(*block_holder); + size_t idx = 0; for (auto* channel : local_state.channels) { if (!channel->is_receiver_eof()) { Status status; if (channel->is_local()) { + // If this channel is the last, we can move this block to downstream pipeline. + // Otherwise, this block also need to be broadcasted to other channels so should be copied. + DCHECK_GE(local_state._last_local_channel_idx, 0); status = channel->send_local_block( - &cur_block, local_state.channels.size() != 1); + &cur_block, idx == local_state._last_local_channel_idx); } else { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); status = channel->send_broadcast_block(block_holder, eos); } HANDLE_CHANNEL_STATUS(state, channel, status); } + idx++; } cur_block.clear_column_data(); local_state._serializer.get_block()->set_mutable_columns( diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index adf8a3424706d2..c60cefabfa8380 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -202,6 +202,7 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { // for external table sink hash partition std::unique_ptr _partition_function = nullptr; std::atomic _reach_limit = false; + int _last_local_channel_idx = -1; }; class ExchangeSinkOperatorX final : public DataSinkOperatorX { diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 5e020bc2cac6ef..1f24ba122e7e3f 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -210,7 +210,7 @@ Status Channel::send_local_block(Status exec_status, bool eos) { } template -Status Channel::send_local_block(Block* block, bool is_broadcast) { +Status Channel::send_local_block(Block* block, bool can_be_moved) { SCOPED_TIMER(_parent->local_send_timer()); if (_recvr_is_valid()) { if constexpr (!std::is_same_v) { @@ -218,7 +218,7 @@ Status Channel::send_local_block(Block* block, bool is_broadcast) { COUNTER_UPDATE(_parent->local_sent_rows(), block->rows()); COUNTER_UPDATE(_parent->blocks_sent_counter(), 1); } - _local_recvr->add_block(block, _parent->sender_id(), !is_broadcast); + _local_recvr->add_block(block, _parent->sender_id(), can_be_moved); return Status::OK(); } else { return _receiver_status; diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 7ea710ed790a2c..b046c2efafcddf 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -156,7 +156,7 @@ class Channel { Status send_local_block(Status exec_status, bool eos = false); - Status send_local_block(Block* block, bool is_broadcast); + Status send_local_block(Block* block, bool can_be_moved); // Flush buffered rows and close channel. This function don't wait the response // of close operation, client should call close_wait() to finish channel's close. // We split one close operation into two phases in order to make multiple channels