Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 23 additions & 10 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,19 +114,20 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
auto& p = _parent->cast<ExchangeSinkOperatorX>();
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());

if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM ||
_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) {
std::random_device rd;
std::mt19937 g(rd());
shuffle(channels.begin(), channels.end(), g);
}
int local_size = 0;
for (int i = 0; i < channels.size(); ++i) {
RETURN_IF_ERROR(channels[i]->open(state));
if (channels[i]->is_local()) {
local_size++;
_last_local_channel_idx = i;
}
}
if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM ||
_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) {
std::random_device rd;
std::mt19937 g(rd());
shuffle(channels.begin(), channels.end(), g);
}
only_local_exchange = local_size == channels.size();

PUniqueId id;
Expand Down Expand Up @@ -387,11 +388,17 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
if (local_state.only_local_exchange) {
if (!block->empty()) {
Status status;
size_t idx = 0;
for (auto* channel : local_state.channels) {
if (!channel->is_receiver_eof()) {
status = channel->send_local_block(block);
// If this channel is the last, we can move this block to downstream pipeline.
// Otherwise, this block also need to be broadcasted to other channels so should be copied.
DCHECK_GE(local_state._last_local_channel_idx, 0);
status = channel->send_local_block(
block, idx == local_state._last_local_channel_idx);
HANDLE_CHANNEL_STATUS(state, channel, status);
}
idx++;
}
}
} else {
Expand All @@ -414,17 +421,23 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block

local_state._broadcast_pb_mem_limiter->acquire(*block_holder);

size_t idx = 0;
for (auto* channel : local_state.channels) {
if (!channel->is_receiver_eof()) {
Status status;
if (channel->is_local()) {
status = channel->send_local_block(&cur_block);
// If this channel is the last, we can move this block to downstream pipeline.
// Otherwise, this block also need to be broadcasted to other channels so should be copied.
DCHECK_GE(local_state._last_local_channel_idx, 0);
status = channel->send_local_block(
&cur_block, idx == local_state._last_local_channel_idx);
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
status = channel->send_broadcast_block(block_holder, eos);
}
HANDLE_CHANNEL_STATUS(state, channel, status);
}
idx++;
}
cur_block.clear_column_data();
local_state._serializer.get_block()->set_mutable_columns(
Expand All @@ -439,7 +452,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
if (!current_channel->is_receiver_eof()) {
// 2. serialize, send and rollover block
if (current_channel->is_local()) {
auto status = current_channel->send_local_block(block);
auto status = current_channel->send_local_block(block, false);
HANDLE_CHANNEL_STATUS(state, current_channel, status);
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
Expand Down Expand Up @@ -525,7 +538,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
if (!current_channel->is_receiver_eof()) {
// 2. serialize, send and rollover block
if (current_channel->is_local()) {
auto status = current_channel->send_local_block(block);
auto status = current_channel->send_local_block(block, false);
HANDLE_CHANNEL_STATUS(state, current_channel, status);
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
// for external table sink hash partition
std::unique_ptr<HashPartitionFunction> _partition_function = nullptr;
std::atomic<bool> _reach_limit = false;
int _last_local_channel_idx = -1;
};

class ExchangeSinkOperatorX final : public DataSinkOperatorX<ExchangeSinkLocalState> {
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/sink/vdata_stream_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,15 +210,15 @@ Status Channel<Parent>::send_local_block(Status exec_status, bool eos) {
}

template <typename Parent>
Status Channel<Parent>::send_local_block(Block* block) {
Status Channel<Parent>::send_local_block(Block* block, bool can_be_moved) {
SCOPED_TIMER(_parent->local_send_timer());
if (_recvr_is_valid()) {
if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) {
COUNTER_UPDATE(_parent->local_bytes_send_counter(), block->bytes());
COUNTER_UPDATE(_parent->local_sent_rows(), block->rows());
COUNTER_UPDATE(_parent->blocks_sent_counter(), 1);
}
_local_recvr->add_block(block, _parent->sender_id(), false);
_local_recvr->add_block(block, _parent->sender_id(), can_be_moved);
return Status::OK();
} else {
return _receiver_status;
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/sink/vdata_stream_sender.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ class Channel {

Status send_local_block(Status exec_status, bool eos = false);

Status send_local_block(Block* block);
Status send_local_block(Block* block, bool can_be_moved);
// Flush buffered rows and close channel. This function don't wait the response
// of close operation, client should call close_wait() to finish channel's close.
// We split one close operation into two phases in order to make multiple channels
Expand Down