Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include <gen_cpp/Types_types.h>
#include <gen_cpp/types.pb.h>

#include <algorithm>
#include <cstdint>
#include <memory>
#include <mutex>
#include <random>
Expand All @@ -34,10 +36,12 @@
#include "pipeline/exec/sort_source_operator.h"
#include "pipeline/local_exchange/local_exchange_sink_operator.h"
#include "pipeline/pipeline_fragment_context.h"
#include "pipeline/shuffle/exchange_writer.h"
#include "util/runtime_profile.h"
#include "util/uid_util.h"
#include "vec/columns/column_const.h"
#include "vec/exprs/vexpr.h"
#include "vec/sink/scale_writer_partitioning_exchanger.hpp"
#include "vec/sink/tablet_sink_hash_partitioner.h"

namespace doris::pipeline {
Expand Down Expand Up @@ -65,6 +69,8 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
_distribute_rows_into_channels_timer =
ADD_TIMER(custom_profile(), "DistributeRowsIntoChannelsTime");
_send_new_partition_timer = ADD_TIMER(custom_profile(), "SendNewPartitionTime");
_add_partition_request_timer =
ADD_CHILD_TIMER(custom_profile(), "AddPartitionRequestTime", "SendNewPartitionTime");
_blocks_sent_counter =
ADD_COUNTER_WITH_LEVEL(custom_profile(), "BlocksProduced", TUnit::UNIT, 1);
_overall_throughput = custom_profile()->add_derived_counter(
Expand Down Expand Up @@ -141,11 +147,12 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
custom_profile()->add_info_string(
"Partitioner", fmt::format("Crc32HashPartitioner({})", _partition_count));
} else if (_part_type == TPartitionType::OLAP_TABLE_SINK_HASH_PARTITIONED) {
// in ExchangeOlapWriter we rely on type of _partitioner here
_partition_count = channels.size();
custom_profile()->add_info_string(
"Partitioner", fmt::format("TabletSinkHashPartitioner({})", _partition_count));
_partitioner = std::make_unique<vectorized::TabletSinkHashPartitioner>(
_partition_count, p._tablet_sink_txn_id, p._tablet_sink_schema,
cast_set<uint32_t>(_partition_count), p._tablet_sink_txn_id, p._tablet_sink_schema,
p._tablet_sink_partition, p._tablet_sink_location, p._tablet_sink_tuple_id, this);
RETURN_IF_ERROR(_partitioner->init({}));
RETURN_IF_ERROR(_partitioner->prepare(state, {}));
Expand Down Expand Up @@ -217,7 +224,11 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
RETURN_IF_ERROR(Base::open(state));
_writer = std::make_unique<Writer>();
if (_part_type == TPartitionType::OLAP_TABLE_SINK_HASH_PARTITIONED) {
_writer = std::make_unique<ExchangeOlapWriter>(*this);
} else {
_writer = std::make_unique<ExchangeTrivialWriter>(*this);
}

for (auto& channel : channels) {
RETURN_IF_ERROR(channel->open(state));
Expand Down Expand Up @@ -365,7 +376,8 @@ Status ExchangeSinkOperatorX::_handle_eof_channel(RuntimeState* state, ChannelPt

Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block, bool eos) {
auto& local_state = get_local_state(state);
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)block->rows());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)block->rows()); // for auto-partition, may decease when do_partitioning
SCOPED_TIMER(local_state.exec_time_counter());
bool all_receiver_eof = true;
for (auto& channel : local_state.channels) {
Expand Down Expand Up @@ -488,7 +500,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
if (!local_state.local_channel_ids.empty()) {
const auto& ids = local_state.local_channel_ids;
// Find the first channel ID >= current_channel_idx
auto it = std::lower_bound(ids.begin(), ids.end(), local_state.current_channel_idx);
auto it = std::ranges::lower_bound(ids, local_state.current_channel_idx);
if (it != ids.end()) {
local_state.current_channel_idx = *it;
} else {
Expand All @@ -501,9 +513,9 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
(local_state.current_channel_idx + 1) % local_state.channels.size();
} else if (_part_type == TPartitionType::HASH_PARTITIONED ||
_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED ||
_part_type == TPartitionType::OLAP_TABLE_SINK_HASH_PARTITIONED ||
_part_type == TPartitionType::HIVE_TABLE_SINK_HASH_PARTITIONED) {
RETURN_IF_ERROR(local_state._writer->write(&local_state, state, block, eos));
_part_type == TPartitionType::HIVE_TABLE_SINK_HASH_PARTITIONED ||
_part_type == TPartitionType::OLAP_TABLE_SINK_HASH_PARTITIONED) {
RETURN_IF_ERROR(local_state._writer->write(state, block, eos));
} else if (_part_type == TPartitionType::HIVE_TABLE_SINK_UNPARTITIONED) {
// Control the number of channels according to the flow, thereby controlling the number of table sink writers.
RETURN_IF_ERROR(send_to_current_channel());
Expand Down
11 changes: 6 additions & 5 deletions be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@

#include <stdint.h>

#include <algorithm>
#include <atomic>
#include <memory>
#include <mutex>

#include "common/status.h"
#include "exchange_sink_buffer.h"
#include "operator.h"
#include "pipeline/shuffle/writer.h"
#include "vec/sink/scale_writer_partitioning_exchanger.hpp"
#include "pipeline/shuffle/exchange_writer.h"
#include "vec/sink/vdata_stream_sender.h"

namespace doris {
Expand Down Expand Up @@ -66,8 +66,9 @@ class ExchangeSinkLocalState MOCK_REMOVE(final) : public PipelineXSinkLocalState
if (_queue_dependency) {
dep_vec.push_back(_queue_dependency.get());
}
std::for_each(_local_channels_dependency.begin(), _local_channels_dependency.end(),
[&](std::shared_ptr<Dependency> dep) { dep_vec.push_back(dep.get()); });
std::ranges::for_each(_local_channels_dependency, [&](std::shared_ptr<Dependency> dep) {
dep_vec.push_back(dep.get());
});
return dep_vec;
}
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Expand Down Expand Up @@ -168,7 +169,7 @@ class ExchangeSinkLocalState MOCK_REMOVE(final) : public PipelineXSinkLocalState
*/
std::vector<std::shared_ptr<Dependency>> _local_channels_dependency;
std::unique_ptr<vectorized::PartitionerBase> _partitioner;
std::unique_ptr<Writer> _writer;
std::unique_ptr<ExchangeWriterBase> _writer;
size_t _partition_count;

std::shared_ptr<Dependency> _finish_dependency;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ Status PartitionedHashJoinProbeOperatorX::push(RuntimeState* state, vectorized::
}

std::vector<std::vector<uint32_t>> partition_indexes(_partition_count);
const auto* channel_ids = local_state._partitioner->get_channel_ids().get<uint32_t>();
const auto& channel_ids = local_state._partitioner->get_channel_ids();
for (uint32_t i = 0; i != rows; ++i) {
partition_indexes[channel_ids[i]].emplace_back(i);
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ Status PartitionedHashJoinSinkLocalState::_execute_spill_unpartitioned_block(
(void)_partitioner->do_partitioning(state, &sub_block);
}

const auto* channel_ids = _partitioner->get_channel_ids().get<uint32_t>();
const auto& channel_ids = _partitioner->get_channel_ids();
for (size_t i = 0; i != sub_block.rows(); ++i) {
partitions_indexes[channel_ids[i]].emplace_back(i);
}
Expand Down Expand Up @@ -435,7 +435,7 @@ Status PartitionedHashJoinSinkLocalState::_partition_block(RuntimeState* state,

auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
SCOPED_TIMER(_partition_shuffle_timer);
const auto* channel_ids = _partitioner->get_channel_ids().get<uint32_t>();
const auto& channel_ids = _partitioner->get_channel_ids();
std::vector<std::vector<uint32_t>> partition_indexes(p._partition_count);
DCHECK_LT(begin, end);
for (size_t i = begin; i != end; ++i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,14 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
if (state->low_memory_mode()) {
set_low_memory_mode(state);
}
SinkInfo sink_info = {.channel_id = &local_state._channel_id,
.partitioner = local_state._partitioner.get(),
.local_state = &local_state,
.shuffle_idx_to_instance_idx = &_shuffle_idx_to_instance_idx};
RETURN_IF_ERROR(local_state._exchanger->sink(
state, in_block, eos,
{local_state._compute_hash_value_timer, local_state._distribute_timer, nullptr},
{&local_state._channel_id, local_state._partitioner.get(), &local_state,
&_shuffle_idx_to_instance_idx}));
sink_info));

// If all exchange sources ended due to limit reached, current task should also finish
if (local_state._exchanger->_running_source_operators == 0) {
Expand Down
47 changes: 26 additions & 21 deletions be/src/pipeline/local_exchange/local_exchanger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ bool Exchanger<BlockType>::_dequeue_data(BlockType& block, bool* eos, vectorized
}

Status ShuffleExchanger::sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
Profile&& profile, SinkInfo&& sink_info) {
Profile&& profile, SinkInfo& sink_info) {
if (in_block->empty()) {
return Status::OK();
}
Expand All @@ -123,8 +123,8 @@ Status ShuffleExchanger::sink(RuntimeState* state, vectorized::Block* in_block,
}
{
SCOPED_TIMER(profile.distribute_timer);
RETURN_IF_ERROR(_split_rows(state, sink_info.partitioner->get_channel_ids().get<uint32_t>(),
in_block, *sink_info.channel_id, sink_info.local_state,
RETURN_IF_ERROR(_split_rows(state, sink_info.partitioner->get_channel_ids(), in_block,
*sink_info.channel_id, sink_info.local_state,
sink_info.shuffle_idx_to_instance_idx));
}

Expand Down Expand Up @@ -172,7 +172,7 @@ Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* block
return Status::OK();
}

Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids,
Status ShuffleExchanger::_split_rows(RuntimeState* state, const std::vector<uint32_t>& channel_ids,
vectorized::Block* block, int channel_id,
LocalExchangeSinkLocalState* local_state,
std::map<int, int>* shuffle_idx_to_instance_idx) {
Expand Down Expand Up @@ -223,8 +223,10 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest
uint32_t size = partition_rows_histogram[it.first + 1] - start;
if (size > 0) {
enqueue_rows += size;
_enqueue_data_and_set_ready(it.second, local_state,
{new_block_wrapper, {row_idx, start, size}});
_enqueue_data_and_set_ready(
it.second, local_state,
{new_block_wrapper,
{.row_idxs = row_idx, .offset_start = start, .length = size}});
}
}
if (enqueue_rows != rows) [[unlikely]] {
Expand All @@ -243,7 +245,7 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest
return Status::OK();
}

Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids,
Status ShuffleExchanger::_split_rows(RuntimeState* state, const std::vector<uint32_t>& channel_ids,
vectorized::Block* block, int channel_id) {
const auto rows = cast_set<int32_t>(block->rows());
auto row_idx = std::make_shared<vectorized::PODArray<uint32_t>>(rows);
Expand Down Expand Up @@ -276,15 +278,17 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest
uint32_t start = partition_rows_histogram[i];
uint32_t size = partition_rows_histogram[i + 1] - start;
if (size > 0) {
_enqueue_data_and_set_ready(i, {new_block_wrapper, {row_idx, start, size}});
_enqueue_data_and_set_ready(
i, {new_block_wrapper,
{.row_idxs = row_idx, .offset_start = start, .length = size}});
}
}

return Status::OK();
}

Status PassthroughExchanger::sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
Profile&& profile, SinkInfo&& sink_info) {
Profile&& profile, SinkInfo& sink_info) {
if (in_block->empty()) {
return Status::OK();
}
Expand Down Expand Up @@ -336,7 +340,7 @@ Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* b
}

Status PassToOneExchanger::sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
Profile&& profile, SinkInfo&& sink_info) {
Profile&& profile, SinkInfo& sink_info) {
if (in_block->empty()) {
return Status::OK();
}
Expand Down Expand Up @@ -377,7 +381,7 @@ void ExchangerBase::finalize() {
}

Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
Profile&& profile, SinkInfo&& sink_info) {
Profile&& profile, SinkInfo& sink_info) {
if (in_block->empty()) {
return Status::OK();
}
Expand All @@ -390,8 +394,9 @@ Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block* in_block
std::move(new_block),
sink_info.local_state ? sink_info.local_state->_shared_state : nullptr, -1);
for (int i = 0; i < _num_partitions; i++) {
_enqueue_data_and_set_ready(i, sink_info.local_state,
{wrapper, {0, wrapper->_data_block.rows()}});
_enqueue_data_and_set_ready(
i, sink_info.local_state,
{wrapper, {.offset_start = 0, .length = wrapper->_data_block.rows()}});
}

return Status::OK();
Expand Down Expand Up @@ -429,7 +434,7 @@ Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block* blo

Status AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state,
vectorized::Block* in_block,
SinkInfo&& sink_info) {
SinkInfo& sink_info) {
vectorized::Block new_block;
if (!_free_blocks.try_dequeue(new_block)) {
new_block = {in_block->clone_empty()};
Expand All @@ -449,7 +454,7 @@ Status AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state,
}

Status AdaptivePassthroughExchanger::_shuffle_sink(RuntimeState* state, vectorized::Block* block,
SinkInfo&& sink_info) {
SinkInfo& sink_info) {
std::vector<uint32_t> channel_ids;
const auto num_rows = block->rows();
channel_ids.resize(num_rows, 0);
Expand All @@ -467,13 +472,13 @@ Status AdaptivePassthroughExchanger::_shuffle_sink(RuntimeState* state, vectoriz

sink_info.local_state->_memory_used_counter->set(
sink_info.local_state->_shared_state->mem_usage);
RETURN_IF_ERROR(_split_rows(state, channel_ids.data(), block, std::move(sink_info)));
RETURN_IF_ERROR(_split_rows(state, channel_ids, block, sink_info));
return Status::OK();
}

Status AdaptivePassthroughExchanger::_split_rows(RuntimeState* state,
const uint32_t* __restrict channel_ids,
vectorized::Block* block, SinkInfo&& sink_info) {
const std::vector<uint32_t>& channel_ids,
vectorized::Block* block, SinkInfo& sink_info) {
const auto rows = cast_set<int32_t>(block->rows());
auto row_idx = std::make_shared<std::vector<uint32_t>>(rows);
auto& partition_rows_histogram = _partition_rows_histogram[*sink_info.channel_id];
Expand Down Expand Up @@ -512,17 +517,17 @@ Status AdaptivePassthroughExchanger::_split_rows(RuntimeState* state,
}

Status AdaptivePassthroughExchanger::sink(RuntimeState* state, vectorized::Block* in_block,
bool eos, Profile&& profile, SinkInfo&& sink_info) {
bool eos, Profile&& profile, SinkInfo& sink_info) {
if (in_block->empty()) {
return Status::OK();
}
if (_is_pass_through) {
return _passthrough_sink(state, in_block, std::move(sink_info));
return _passthrough_sink(state, in_block, sink_info);
} else {
if (++_total_block >= _num_partitions) {
_is_pass_through = true;
}
return _shuffle_sink(state, in_block, std::move(sink_info));
return _shuffle_sink(state, in_block, sink_info);
}
}

Expand Down
Loading
Loading