From 45e6cc2a6423688221f6da5e1d356f49ac93db45 Mon Sep 17 00:00:00 2001 From: kaka11chen Date: Tue, 9 Apr 2024 03:02:30 +0800 Subject: [PATCH 1/3] Init commit. --- be/src/common/config.cpp | 13 +-- be/src/common/config.h | 8 +- .../pipeline/exec/exchange_sink_operator.cpp | 28 +++++-- .../pipeline_x_fragment_context.cpp | 1 + be/src/runtime/runtime_state.h | 5 ++ .../vec/exec/skewed_partition_rebalancer.cpp | 79 +++++++++++++++---- be/src/vec/exec/skewed_partition_rebalancer.h | 17 ++-- .../scale_writer_partitioning_exchanger.hpp | 7 +- 8 files changed, 118 insertions(+), 40 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index f29ec307914eeb..212233dcbae6ee 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1181,14 +1181,15 @@ DEFINE_mString(ca_cert_file_paths, "/etc/ssl/ca-bundle.pem"); /** Table sink configurations(currently contains only external table types) **/ -// Minimum data processed to scale writers when non partition writing +// Minimum data processed to scale writers in exchange when non partition writing DEFINE_mInt64(table_sink_non_partition_write_scaling_data_processed_threshold, - "125829120"); // 120MB -// Minimum data processed to start rebalancing in exchange when partition writing -DEFINE_mInt64(table_sink_partition_write_data_processed_threshold, "209715200"); // 200MB + "26214400"); // 25MB // Minimum data processed to trigger skewed partition rebalancing in exchange when partition writing -DEFINE_mInt64(table_sink_partition_write_skewed_data_processed_rebalance_threshold, - "209715200"); // 200MB +DEFINE_mInt64(table_sink_partition_write_min_data_processed_rebalance_threshold, + "26214400"); // 25MB +// Minimum partition data processed to rebalance writers in exchange when partition writing +DEFINE_mInt64(table_sink_partition_write_min_partition_data_processed_rebalance_threshold, + "15728640"); // 15MB // Maximum processed partition nums of per writer when partition writing DEFINE_mInt32(table_sink_partition_write_max_partition_nums_per_writer, "128"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 81fabfb9517879..b6b7e0c1d3660f 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1265,12 +1265,12 @@ DECLARE_String(trino_connector_plugin_dir); DECLARE_mString(ca_cert_file_paths); /** Table sink configurations(currently contains only external table types) **/ -// Minimum data processed to scale writers when non partition writing +// Minimum data processed to scale writers in exchange when non partition writing DECLARE_mInt64(table_sink_non_partition_write_scaling_data_processed_threshold); -// Minimum data processed to start rebalancing in exchange when partition writing -DECLARE_mInt64(table_sink_partition_write_data_processed_threshold); // Minimum data processed to trigger skewed partition rebalancing in exchange when partition writing -DECLARE_mInt64(table_sink_partition_write_skewed_data_processed_rebalance_threshold); +DECLARE_mInt64(table_sink_partition_write_min_data_processed_rebalance_threshold); +// Minimum partition data processed to rebalance writers in exchange when partition writing +DECLARE_mInt64(table_sink_partition_write_min_partition_data_processed_rebalance_threshold); // Maximum processed partition nums of per writer when partition writing DECLARE_mInt32(table_sink_partition_write_max_partition_nums_per_writer); diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index bc55bc8f805803..e1ca46dcdcc4c3 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -257,17 +257,31 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf _partitioner.reset( new vectorized::Crc32HashPartitioner(_partition_count)); _partition_function.reset(new HashPartitionFunction(_partitioner.get())); - // const long MEGABYTE = 1024 * 1024; - // const long MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD = 10000 * MEGABYTE; // 1MB - // const long MIN_DATA_PROCESSED_REBALANCE_THRESHOLD = 50000 * MEGABYTE; // 50MB - // const long MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD = 1; // 1MB - // const long MIN_DATA_PROCESSED_REBALANCE_THRESHOLD = 1; // 50MB + std::vector task_addresses; + task_addresses.reserve(channels.size()); + for (int i = 0; i < channels.size(); ++i) { + const TNetworkAddress& brpc_dest_addr = channels[i]->brpc_dest_addr(); + task_addresses.emplace_back( + fmt::format("{}:{}", brpc_dest_addr.hostname, brpc_dest_addr.port)); + } scale_writer_partitioning_exchanger.reset(new vectorized::ScaleWriterPartitioningExchanger< HashPartitionFunction>( channels.size(), *_partition_function, _partition_count, channels.size(), 1, - config::table_sink_partition_write_data_processed_threshold, - config::table_sink_partition_write_skewed_data_processed_rebalance_threshold)); + config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold / + state->task_num() == + 0 + ? config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold + : config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold / + state->task_num(), + config::table_sink_partition_write_min_data_processed_rebalance_threshold / + state->task_num() == + 0 + ? config::table_sink_partition_write_min_data_processed_rebalance_threshold + : config::table_sink_partition_write_min_data_processed_rebalance_threshold / + state->task_num(), + &task_addresses)); + RETURN_IF_ERROR(_partitioner->init(p._texprs)); RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc)); _profile->add_info_string("Partitioner", diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index ce2d4a507487eb..f23e39472ab993 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -593,6 +593,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( init_runtime_state(task_runtime_state); auto cur_task_id = _total_tasks++; task_runtime_state->set_task_id(cur_task_id); + task_runtime_state->set_task_num(pipeline->num_tasks()); auto task = std::make_unique( pipeline, cur_task_id, get_task_runtime_state(cur_task_id), this, pipeline_id_to_profile[pip_idx].get(), get_local_exchange_state(pipeline), diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 6fae242d53cd95..85413406fd2978 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -624,6 +624,10 @@ class RuntimeState { int task_id() const { return _task_id; } + void set_task_num(int task_num) { _task_num = task_num; } + + int task_num() const { return _task_num; } + private: Status create_error_log_file(); @@ -734,6 +738,7 @@ class RuntimeState { std::vector _error_tablet_infos; int _max_operator_id = 0; int _task_id = -1; + int _task_num = 0; std::vector _hive_partition_updates; diff --git a/be/src/vec/exec/skewed_partition_rebalancer.cpp b/be/src/vec/exec/skewed_partition_rebalancer.cpp index ae12d365f057dc..a56a35eab9bf34 100644 --- a/be/src/vec/exec/skewed_partition_rebalancer.cpp +++ b/be/src/vec/exec/skewed_partition_rebalancer.cpp @@ -28,7 +28,7 @@ namespace doris::vectorized { SkewedPartitionRebalancer::SkewedPartitionRebalancer( int partition_count, int task_count, int task_bucket_count, long min_partition_data_processed_rebalance_threshold, - long min_data_processed_rebalance_threshold) + long min_data_processed_rebalance_threshold, const std::vector* task_addresses) : _partition_count(partition_count), _task_count(task_count), _task_bucket_count(task_bucket_count), @@ -45,13 +45,40 @@ SkewedPartitionRebalancer::SkewedPartitionRebalancer( _partition_data_size_since_last_rebalance_per_task(partition_count, 0), _estimated_task_bucket_data_size_since_last_rebalance(task_count * task_bucket_count, 0), _partition_assignments(partition_count) { + if (task_addresses != nullptr) { + CHECK(task_addresses->size() == task_count); + _task_addresses = *task_addresses; + for (int i = 0; i < _task_addresses.size(); ++i) { + auto it = _assigned_address_to_task_buckets_num.find(_task_addresses[i]); + if (it == _assigned_address_to_task_buckets_num.end()) { + _assigned_address_to_task_buckets_num.insert({_task_addresses[i], 0}); + } + } + } else { + _assigned_address_to_task_buckets_num.insert({TASK_BUCKET_ADDRESS_NOT_SET, 0}); + } + std::vector task_bucket_ids(task_count, 0); for (int partition = 0; partition < partition_count; partition++) { int task_id = partition % task_count; int bucket_id = task_bucket_ids[task_id]++ % task_bucket_count; - TaskBucket task_bucket(task_id, bucket_id, task_bucket_count); + TaskBucket task_bucket( + task_id, bucket_id, task_bucket_count, + (_task_addresses.empty()) ? TASK_BUCKET_ADDRESS_NOT_SET : _task_addresses[task_id]); _partition_assignments[partition].emplace_back(std::move(task_bucket)); + + for (int i = 0; i < _partition_assignments[partition].size(); ++i) { + auto it = _assigned_address_to_task_buckets_num.find( + _partition_assignments[partition][i].task_address); + if (it != _assigned_address_to_task_buckets_num.end()) { + _assigned_address_to_task_buckets_num[_partition_assignments[partition][i] + .task_address]++; + } else { + LOG(FATAL) << "__builtin_unreachable"; + __builtin_unreachable(); + } + } } } @@ -140,7 +167,7 @@ void SkewedPartitionRebalancer::_rebalance_based_on_task_bucket_skewness( continue; } - std::vector min_skewed_task_buckets = + std::multimap min_skewed_task_buckets = _find_skewed_min_task_buckets(max_task_bucket.value(), min_task_buckets); if (min_skewed_task_buckets.empty()) { break; @@ -161,10 +188,27 @@ void SkewedPartitionRebalancer::_rebalance_based_on_task_bucket_skewness( int total_assigned_tasks = _partition_assignments[max_partition_value].size(); if (_partition_data_size[max_partition_value] >= (_min_partition_data_processed_rebalance_threshold * total_assigned_tasks)) { - for (const TaskBucket& min_task_bucket : min_skewed_task_buckets) { - if (_rebalance_partition(max_partition_value, min_task_bucket, max_task_buckets, - min_task_buckets)) { - scaled_partitions.push_back(max_partition_value); + bool found = false; + std::vector> + sorted_assigned_address_to_task_buckets_num( + _assigned_address_to_task_buckets_num.begin(), + _assigned_address_to_task_buckets_num.end()); + + std::sort(sorted_assigned_address_to_task_buckets_num.begin(), + sorted_assigned_address_to_task_buckets_num.end(), + [](const auto& a, const auto& b) { return a.second < b.second; }); + for (auto& pair : sorted_assigned_address_to_task_buckets_num) { + auto range = min_skewed_task_buckets.equal_range(pair.first); + for (auto it = range.first; it != range.second; ++it) { + TaskBucket& task_bucket = it->second; + if (_rebalance_partition(max_partition_value, task_bucket, max_task_buckets, + min_task_buckets)) { + scaled_partitions.push_back(max_partition_value); + found = true; + break; + } + } + if (found) { break; } } @@ -175,12 +219,12 @@ void SkewedPartitionRebalancer::_rebalance_based_on_task_bucket_skewness( } } -std::vector +std::multimap SkewedPartitionRebalancer::_find_skewed_min_task_buckets( const TaskBucket& max_task_bucket, const IndexedPriorityQueue& min_task_buckets) { - std::vector min_skewed_task_buckets; + std::multimap min_skewed_task_buckets; for (const auto& min_task_bucket : min_task_buckets) { double skewness = @@ -192,7 +236,7 @@ SkewedPartitionRebalancer::_find_skewed_min_task_buckets( break; } if (max_task_bucket.task_id != min_task_bucket.task_id) { - min_skewed_task_buckets.push_back(min_task_bucket); + min_skewed_task_buckets.insert({min_task_bucket.task_address, min_task_bucket}); } } return min_skewed_task_buckets; @@ -213,6 +257,7 @@ bool SkewedPartitionRebalancer::_rebalance_partition( } assignments.push_back(to_task_bucket); + _assigned_address_to_task_buckets_num[to_task_bucket.task_address]++; int new_task_count = assignments.size(); int old_task_count = new_task_count - 1; @@ -279,10 +324,14 @@ void SkewedPartitionRebalancer::_rebalance_partitions(long data_processed) { IndexedPriorityQueue min_task_buckets; - for (int taskId = 0; taskId < _task_count; taskId++) { - for (int bucketId = 0; bucketId < _task_bucket_count; bucketId++) { - TaskBucket task_bucket1(taskId, bucketId, _task_bucket_count); - TaskBucket task_bucket2(taskId, bucketId, _task_bucket_count); + for (int task_id = 0; task_id < _task_count; task_id++) { + for (int bucket_id = 0; bucket_id < _task_bucket_count; bucket_id++) { + TaskBucket task_bucket1(task_id, bucket_id, _task_bucket_count, + (_task_addresses.empty()) ? TASK_BUCKET_ADDRESS_NOT_SET + : _task_addresses[task_id]); + TaskBucket task_bucket2(task_id, bucket_id, _task_bucket_count, + (_task_addresses.empty()) ? TASK_BUCKET_ADDRESS_NOT_SET + : _task_addresses[task_id]); _estimated_task_bucket_data_size_since_last_rebalance[task_bucket1.id] = _calculate_task_bucket_data_size_since_last_rebalance( task_bucket_max_partitions[task_bucket1.id]); @@ -299,4 +348,4 @@ void SkewedPartitionRebalancer::_rebalance_partitions(long data_processed) { task_bucket_max_partitions); _data_processed_at_last_rebalance = data_processed; } -} // namespace doris::vectorized \ No newline at end of file +} // namespace doris::vectorized diff --git a/be/src/vec/exec/skewed_partition_rebalancer.h b/be/src/vec/exec/skewed_partition_rebalancer.h index 814ebc1d465ca1..57b4e067718f22 100644 --- a/be/src/vec/exec/skewed_partition_rebalancer.h +++ b/be/src/vec/exec/skewed_partition_rebalancer.h @@ -60,9 +60,12 @@ class SkewedPartitionRebalancer { struct TaskBucket { int task_id; int id; + std::string task_address; - TaskBucket(int task_id_, int bucket_id_, int task_bucket_count_) - : task_id(task_id_), id(task_id_ * task_bucket_count_ + bucket_id_) {} + TaskBucket(int task_id_, int bucket_id_, int task_bucket_count_, std::string task_address_) + : task_id(task_id_), + id(task_id_ * task_bucket_count_ + bucket_id_), + task_address(task_address_) {} bool operator==(const TaskBucket& other) const { return id == other.id; } @@ -74,7 +77,8 @@ class SkewedPartitionRebalancer { public: SkewedPartitionRebalancer(int partition_count, int task_count, int task_bucket_count, long min_partition_data_processed_rebalance_threshold, - long min_data_processed_rebalance_threshold); + long min_data_processed_rebalance_threshold, + const std::vector* task_addresses = nullptr); std::vector> get_partition_assignments(); int get_task_count(); @@ -96,7 +100,7 @@ class SkewedPartitionRebalancer { std::vector< IndexedPriorityQueue>& task_bucket_max_partitions); - std::vector _find_skewed_min_task_buckets( + std::multimap _find_skewed_min_task_buckets( const TaskBucket& max_task_bucket, const IndexedPriorityQueue& @@ -113,12 +117,14 @@ class SkewedPartitionRebalancer { private: static constexpr double TASK_BUCKET_SKEWNESS_THRESHOLD = 0.7; + static constexpr const char* TASK_BUCKET_ADDRESS_NOT_SET = "TASK_BUCKET_ADDRESS_NOT_SET"; int _partition_count; int _task_count; int _task_bucket_count; long _min_partition_data_processed_rebalance_threshold; long _min_data_processed_rebalance_threshold; + std::vector _task_addresses; std::vector _partition_row_count; long _data_processed; long _data_processed_at_last_rebalance; @@ -128,5 +134,6 @@ class SkewedPartitionRebalancer { std::vector _estimated_task_bucket_data_size_since_last_rebalance; std::vector> _partition_assignments; + std::unordered_map _assigned_address_to_task_buckets_num; }; -} // namespace doris::vectorized \ No newline at end of file +} // namespace doris::vectorized diff --git a/be/src/vec/sink/scale_writer_partitioning_exchanger.hpp b/be/src/vec/sink/scale_writer_partitioning_exchanger.hpp index f7435249c20f64..ec9c4163690d20 100644 --- a/be/src/vec/sink/scale_writer_partitioning_exchanger.hpp +++ b/be/src/vec/sink/scale_writer_partitioning_exchanger.hpp @@ -33,12 +33,13 @@ class ScaleWriterPartitioningExchanger { ScaleWriterPartitioningExchanger(int channel_size, PartitionFunction& partition_function, int partition_count, int task_count, int task_bucket_count, long min_partition_data_processed_rebalance_threshold, - long min_data_processed_rebalance_threshold) + long min_data_processed_rebalance_threshold, + const std::vector* task_addresses) : _channel_size(channel_size), _partition_function(partition_function), _partition_rebalancer(partition_count, task_count, task_bucket_count, min_partition_data_processed_rebalance_threshold, - min_data_processed_rebalance_threshold), + min_data_processed_rebalance_threshold, task_addresses), _partition_row_counts(partition_count, 0), _partition_writer_ids(partition_count, -1), _partition_writer_indexes(partition_count, 0) {} @@ -89,4 +90,4 @@ class ScaleWriterPartitioningExchanger { std::vector _partition_writer_indexes; }; -} // namespace doris::vectorized \ No newline at end of file +} // namespace doris::vectorized From 5ed8c4f265af9e225ef7b0d4a62b6702e596dfa7 Mon Sep 17 00:00:00 2001 From: kaka11chen Date: Tue, 9 Apr 2024 09:31:21 +0800 Subject: [PATCH 2/3] Fix compiling issue. --- be/src/vec/exec/skewed_partition_rebalancer.cpp | 2 ++ be/src/vec/exec/skewed_partition_rebalancer.h | 4 +++- be/src/vec/sink/vdata_stream_sender.h | 2 ++ 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/be/src/vec/exec/skewed_partition_rebalancer.cpp b/be/src/vec/exec/skewed_partition_rebalancer.cpp index a56a35eab9bf34..7332d00e32c079 100644 --- a/be/src/vec/exec/skewed_partition_rebalancer.cpp +++ b/be/src/vec/exec/skewed_partition_rebalancer.cpp @@ -20,6 +20,8 @@ #include "vec/exec/skewed_partition_rebalancer.h" +#include + #include #include diff --git a/be/src/vec/exec/skewed_partition_rebalancer.h b/be/src/vec/exec/skewed_partition_rebalancer.h index 57b4e067718f22..d89abd041e88b1 100644 --- a/be/src/vec/exec/skewed_partition_rebalancer.h +++ b/be/src/vec/exec/skewed_partition_rebalancer.h @@ -49,6 +49,7 @@ #include #include #include +#include #include #include "util/indexed_priority_queue.hpp" @@ -62,7 +63,8 @@ class SkewedPartitionRebalancer { int id; std::string task_address; - TaskBucket(int task_id_, int bucket_id_, int task_bucket_count_, std::string task_address_) + TaskBucket(int task_id_, int bucket_id_, int task_bucket_count_, + const std::string& task_address_) : task_id(task_id_), id(task_id_ * task_bucket_count_ + bucket_id_), task_address(task_address_) {} diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 8119b5a35f9772..1886243718d8e1 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -342,6 +342,8 @@ class Channel { void set_receiver_eof(Status st) { _receiver_status = st; } + const TNetworkAddress& brpc_dest_addr() const { return _brpc_dest_addr; } + protected: bool _recvr_is_valid() { if (_local_recvr && !_local_recvr->is_closed()) { From 90ad364b3cb4ec9e708ad54dac01e6afdf6c3ac3 Mon Sep 17 00:00:00 2001 From: kaka11chen Date: Tue, 9 Apr 2024 15:59:35 +0800 Subject: [PATCH 3/3] remote task address facility. --- .../pipeline/exec/exchange_sink_operator.cpp | 10 +-- .../vec/exec/skewed_partition_rebalancer.cpp | 81 ++++--------------- be/src/vec/exec/skewed_partition_rebalancer.h | 19 ++--- .../scale_writer_partitioning_exchanger.hpp | 7 +- be/src/vec/sink/vdata_stream_sender.h | 2 - 5 files changed, 24 insertions(+), 95 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index e1ca46dcdcc4c3..8323e20cfd17c1 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -258,13 +258,6 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf new vectorized::Crc32HashPartitioner(_partition_count)); _partition_function.reset(new HashPartitionFunction(_partitioner.get())); - std::vector task_addresses; - task_addresses.reserve(channels.size()); - for (int i = 0; i < channels.size(); ++i) { - const TNetworkAddress& brpc_dest_addr = channels[i]->brpc_dest_addr(); - task_addresses.emplace_back( - fmt::format("{}:{}", brpc_dest_addr.hostname, brpc_dest_addr.port)); - } scale_writer_partitioning_exchanger.reset(new vectorized::ScaleWriterPartitioningExchanger< HashPartitionFunction>( channels.size(), *_partition_function, _partition_count, channels.size(), 1, @@ -279,8 +272,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf 0 ? config::table_sink_partition_write_min_data_processed_rebalance_threshold : config::table_sink_partition_write_min_data_processed_rebalance_threshold / - state->task_num(), - &task_addresses)); + state->task_num())); RETURN_IF_ERROR(_partitioner->init(p._texprs)); RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc)); diff --git a/be/src/vec/exec/skewed_partition_rebalancer.cpp b/be/src/vec/exec/skewed_partition_rebalancer.cpp index 7332d00e32c079..ae12d365f057dc 100644 --- a/be/src/vec/exec/skewed_partition_rebalancer.cpp +++ b/be/src/vec/exec/skewed_partition_rebalancer.cpp @@ -20,8 +20,6 @@ #include "vec/exec/skewed_partition_rebalancer.h" -#include - #include #include @@ -30,7 +28,7 @@ namespace doris::vectorized { SkewedPartitionRebalancer::SkewedPartitionRebalancer( int partition_count, int task_count, int task_bucket_count, long min_partition_data_processed_rebalance_threshold, - long min_data_processed_rebalance_threshold, const std::vector* task_addresses) + long min_data_processed_rebalance_threshold) : _partition_count(partition_count), _task_count(task_count), _task_bucket_count(task_bucket_count), @@ -47,40 +45,13 @@ SkewedPartitionRebalancer::SkewedPartitionRebalancer( _partition_data_size_since_last_rebalance_per_task(partition_count, 0), _estimated_task_bucket_data_size_since_last_rebalance(task_count * task_bucket_count, 0), _partition_assignments(partition_count) { - if (task_addresses != nullptr) { - CHECK(task_addresses->size() == task_count); - _task_addresses = *task_addresses; - for (int i = 0; i < _task_addresses.size(); ++i) { - auto it = _assigned_address_to_task_buckets_num.find(_task_addresses[i]); - if (it == _assigned_address_to_task_buckets_num.end()) { - _assigned_address_to_task_buckets_num.insert({_task_addresses[i], 0}); - } - } - } else { - _assigned_address_to_task_buckets_num.insert({TASK_BUCKET_ADDRESS_NOT_SET, 0}); - } - std::vector task_bucket_ids(task_count, 0); for (int partition = 0; partition < partition_count; partition++) { int task_id = partition % task_count; int bucket_id = task_bucket_ids[task_id]++ % task_bucket_count; - TaskBucket task_bucket( - task_id, bucket_id, task_bucket_count, - (_task_addresses.empty()) ? TASK_BUCKET_ADDRESS_NOT_SET : _task_addresses[task_id]); + TaskBucket task_bucket(task_id, bucket_id, task_bucket_count); _partition_assignments[partition].emplace_back(std::move(task_bucket)); - - for (int i = 0; i < _partition_assignments[partition].size(); ++i) { - auto it = _assigned_address_to_task_buckets_num.find( - _partition_assignments[partition][i].task_address); - if (it != _assigned_address_to_task_buckets_num.end()) { - _assigned_address_to_task_buckets_num[_partition_assignments[partition][i] - .task_address]++; - } else { - LOG(FATAL) << "__builtin_unreachable"; - __builtin_unreachable(); - } - } } } @@ -169,7 +140,7 @@ void SkewedPartitionRebalancer::_rebalance_based_on_task_bucket_skewness( continue; } - std::multimap min_skewed_task_buckets = + std::vector min_skewed_task_buckets = _find_skewed_min_task_buckets(max_task_bucket.value(), min_task_buckets); if (min_skewed_task_buckets.empty()) { break; @@ -190,27 +161,10 @@ void SkewedPartitionRebalancer::_rebalance_based_on_task_bucket_skewness( int total_assigned_tasks = _partition_assignments[max_partition_value].size(); if (_partition_data_size[max_partition_value] >= (_min_partition_data_processed_rebalance_threshold * total_assigned_tasks)) { - bool found = false; - std::vector> - sorted_assigned_address_to_task_buckets_num( - _assigned_address_to_task_buckets_num.begin(), - _assigned_address_to_task_buckets_num.end()); - - std::sort(sorted_assigned_address_to_task_buckets_num.begin(), - sorted_assigned_address_to_task_buckets_num.end(), - [](const auto& a, const auto& b) { return a.second < b.second; }); - for (auto& pair : sorted_assigned_address_to_task_buckets_num) { - auto range = min_skewed_task_buckets.equal_range(pair.first); - for (auto it = range.first; it != range.second; ++it) { - TaskBucket& task_bucket = it->second; - if (_rebalance_partition(max_partition_value, task_bucket, max_task_buckets, - min_task_buckets)) { - scaled_partitions.push_back(max_partition_value); - found = true; - break; - } - } - if (found) { + for (const TaskBucket& min_task_bucket : min_skewed_task_buckets) { + if (_rebalance_partition(max_partition_value, min_task_bucket, max_task_buckets, + min_task_buckets)) { + scaled_partitions.push_back(max_partition_value); break; } } @@ -221,12 +175,12 @@ void SkewedPartitionRebalancer::_rebalance_based_on_task_bucket_skewness( } } -std::multimap +std::vector SkewedPartitionRebalancer::_find_skewed_min_task_buckets( const TaskBucket& max_task_bucket, const IndexedPriorityQueue& min_task_buckets) { - std::multimap min_skewed_task_buckets; + std::vector min_skewed_task_buckets; for (const auto& min_task_bucket : min_task_buckets) { double skewness = @@ -238,7 +192,7 @@ SkewedPartitionRebalancer::_find_skewed_min_task_buckets( break; } if (max_task_bucket.task_id != min_task_bucket.task_id) { - min_skewed_task_buckets.insert({min_task_bucket.task_address, min_task_bucket}); + min_skewed_task_buckets.push_back(min_task_bucket); } } return min_skewed_task_buckets; @@ -259,7 +213,6 @@ bool SkewedPartitionRebalancer::_rebalance_partition( } assignments.push_back(to_task_bucket); - _assigned_address_to_task_buckets_num[to_task_bucket.task_address]++; int new_task_count = assignments.size(); int old_task_count = new_task_count - 1; @@ -326,14 +279,10 @@ void SkewedPartitionRebalancer::_rebalance_partitions(long data_processed) { IndexedPriorityQueue min_task_buckets; - for (int task_id = 0; task_id < _task_count; task_id++) { - for (int bucket_id = 0; bucket_id < _task_bucket_count; bucket_id++) { - TaskBucket task_bucket1(task_id, bucket_id, _task_bucket_count, - (_task_addresses.empty()) ? TASK_BUCKET_ADDRESS_NOT_SET - : _task_addresses[task_id]); - TaskBucket task_bucket2(task_id, bucket_id, _task_bucket_count, - (_task_addresses.empty()) ? TASK_BUCKET_ADDRESS_NOT_SET - : _task_addresses[task_id]); + for (int taskId = 0; taskId < _task_count; taskId++) { + for (int bucketId = 0; bucketId < _task_bucket_count; bucketId++) { + TaskBucket task_bucket1(taskId, bucketId, _task_bucket_count); + TaskBucket task_bucket2(taskId, bucketId, _task_bucket_count); _estimated_task_bucket_data_size_since_last_rebalance[task_bucket1.id] = _calculate_task_bucket_data_size_since_last_rebalance( task_bucket_max_partitions[task_bucket1.id]); @@ -350,4 +299,4 @@ void SkewedPartitionRebalancer::_rebalance_partitions(long data_processed) { task_bucket_max_partitions); _data_processed_at_last_rebalance = data_processed; } -} // namespace doris::vectorized +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/exec/skewed_partition_rebalancer.h b/be/src/vec/exec/skewed_partition_rebalancer.h index d89abd041e88b1..814ebc1d465ca1 100644 --- a/be/src/vec/exec/skewed_partition_rebalancer.h +++ b/be/src/vec/exec/skewed_partition_rebalancer.h @@ -49,7 +49,6 @@ #include #include #include -#include #include #include "util/indexed_priority_queue.hpp" @@ -61,13 +60,9 @@ class SkewedPartitionRebalancer { struct TaskBucket { int task_id; int id; - std::string task_address; - TaskBucket(int task_id_, int bucket_id_, int task_bucket_count_, - const std::string& task_address_) - : task_id(task_id_), - id(task_id_ * task_bucket_count_ + bucket_id_), - task_address(task_address_) {} + TaskBucket(int task_id_, int bucket_id_, int task_bucket_count_) + : task_id(task_id_), id(task_id_ * task_bucket_count_ + bucket_id_) {} bool operator==(const TaskBucket& other) const { return id == other.id; } @@ -79,8 +74,7 @@ class SkewedPartitionRebalancer { public: SkewedPartitionRebalancer(int partition_count, int task_count, int task_bucket_count, long min_partition_data_processed_rebalance_threshold, - long min_data_processed_rebalance_threshold, - const std::vector* task_addresses = nullptr); + long min_data_processed_rebalance_threshold); std::vector> get_partition_assignments(); int get_task_count(); @@ -102,7 +96,7 @@ class SkewedPartitionRebalancer { std::vector< IndexedPriorityQueue>& task_bucket_max_partitions); - std::multimap _find_skewed_min_task_buckets( + std::vector _find_skewed_min_task_buckets( const TaskBucket& max_task_bucket, const IndexedPriorityQueue& @@ -119,14 +113,12 @@ class SkewedPartitionRebalancer { private: static constexpr double TASK_BUCKET_SKEWNESS_THRESHOLD = 0.7; - static constexpr const char* TASK_BUCKET_ADDRESS_NOT_SET = "TASK_BUCKET_ADDRESS_NOT_SET"; int _partition_count; int _task_count; int _task_bucket_count; long _min_partition_data_processed_rebalance_threshold; long _min_data_processed_rebalance_threshold; - std::vector _task_addresses; std::vector _partition_row_count; long _data_processed; long _data_processed_at_last_rebalance; @@ -136,6 +128,5 @@ class SkewedPartitionRebalancer { std::vector _estimated_task_bucket_data_size_since_last_rebalance; std::vector> _partition_assignments; - std::unordered_map _assigned_address_to_task_buckets_num; }; -} // namespace doris::vectorized +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/sink/scale_writer_partitioning_exchanger.hpp b/be/src/vec/sink/scale_writer_partitioning_exchanger.hpp index ec9c4163690d20..f7435249c20f64 100644 --- a/be/src/vec/sink/scale_writer_partitioning_exchanger.hpp +++ b/be/src/vec/sink/scale_writer_partitioning_exchanger.hpp @@ -33,13 +33,12 @@ class ScaleWriterPartitioningExchanger { ScaleWriterPartitioningExchanger(int channel_size, PartitionFunction& partition_function, int partition_count, int task_count, int task_bucket_count, long min_partition_data_processed_rebalance_threshold, - long min_data_processed_rebalance_threshold, - const std::vector* task_addresses) + long min_data_processed_rebalance_threshold) : _channel_size(channel_size), _partition_function(partition_function), _partition_rebalancer(partition_count, task_count, task_bucket_count, min_partition_data_processed_rebalance_threshold, - min_data_processed_rebalance_threshold, task_addresses), + min_data_processed_rebalance_threshold), _partition_row_counts(partition_count, 0), _partition_writer_ids(partition_count, -1), _partition_writer_indexes(partition_count, 0) {} @@ -90,4 +89,4 @@ class ScaleWriterPartitioningExchanger { std::vector _partition_writer_indexes; }; -} // namespace doris::vectorized +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 1886243718d8e1..8119b5a35f9772 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -342,8 +342,6 @@ class Channel { void set_receiver_eof(Status st) { _receiver_status = st; } - const TNetworkAddress& brpc_dest_addr() const { return _brpc_dest_addr; } - protected: bool _recvr_is_valid() { if (_local_recvr && !_local_recvr->is_closed()) {