From faa6e21e85c5af65374aa53dea812044e1ab5c4e Mon Sep 17 00:00:00 2001 From: HappenLee Date: Tue, 15 Apr 2025 23:45:29 +0800 Subject: [PATCH 1/5] support one rpc send muti blocks --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 216 +++++++++++++----- be/src/pipeline/exec/exchange_sink_buffer.h | 9 +- be/src/vec/runtime/vdata_stream_mgr.cpp | 12 + .../org/apache/doris/qe/SessionVariable.java | 6 + gensrc/proto/internal_service.proto | 1 + gensrc/thrift/PaloInternalService.thrift | 1 + 6 files changed, 187 insertions(+), 58 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 95a5a00f68af75..1285b2c2612fa9 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -100,7 +100,13 @@ ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_ _node_id(node_id), _state(state), _context(state->get_query_ctx()), - _exchange_sink_num(sender_ins_ids.size()) {} + _exchange_sink_num(sender_ins_ids.size()), + _send_multi_blocks(state->query_options().__isset.exchange_multi_blocks_byte_size && + state->query_options().exchange_multi_blocks_byte_size > 0) { + if (_send_multi_blocks) { + _send_multi_blocks_byte_size = state->query_options().exchange_multi_blocks_byte_size; + } +} void ExchangeSinkBuffer::close() { // Could not clear the queue here, because there maybe a running rpc want to @@ -124,9 +130,12 @@ void ExchangeSinkBuffer::construct_request(TUniqueId fragment_instance_id) { auto instance_data = std::make_unique(low_id); instance_data->mutex = std::make_unique(); instance_data->seq = 0; - instance_data->package_queue = std::queue>(); - instance_data->broadcast_package_queue = - std::queue>(); + instance_data->package_queue = + std::unordered_map>>(); + instance_data->broadcast_package_queue = std::unordered_map< + vectorized::Channel*, + std::queue>>(); _queue_capacity = config::exchg_buffer_queue_capacity_factor * _rpc_instances.size(); PUniqueId finst_id; @@ -173,7 +182,7 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) { COUNTER_UPDATE(request.channel->_parent->memory_used_counter(), request.block->ByteSizeLong()); } - instance_data.package_queue.emplace(std::move(request)); + instance_data.package_queue[request.channel].emplace(std::move(request)); _total_queue_size++; if (_total_queue_size > _queue_capacity) { for (auto& dep : _queue_deps) { @@ -213,7 +222,7 @@ Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) { RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version( request.block_holder->get_block()->be_exec_version())); } - instance_data.broadcast_package_queue.emplace(request); + instance_data.broadcast_package_queue[request.channel].emplace(request); } if (send_now) { RETURN_IF_ERROR(_send_rpc(instance_data)); @@ -225,9 +234,27 @@ Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) { Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) { std::unique_lock lock(*(instance_data.mutex)); - std::queue>& q = instance_data.package_queue; - std::queue>& broadcast_q = - instance_data.broadcast_package_queue; + auto& q_map = instance_data.package_queue; + auto& broadcast_q_map = instance_data.broadcast_package_queue; + + auto find_max_size_queue = [](auto& ptr, auto& map) { + for (auto& [_, lists] : map) { + if (!ptr) { + if (!lists.empty()) { + ptr = &lists; + } + } else { + if (ptr->size() < lists.size()) { + ptr = &lists; + } + } + } + }; + std::queue>* q_ptr = nullptr; + find_max_size_queue(q_ptr, q_map); + + std::queue>* broadcast_q_ptr = nullptr; + find_max_size_queue(broadcast_q_ptr, broadcast_q_map); if (_is_failed) { _turn_off_channel(instance_data, lock); @@ -237,19 +264,49 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) { return Status::OK(); } - if (!q.empty()) { + auto mem_byte = 0; + if (q_ptr && !q_ptr->empty()) { + auto& q = *q_ptr; + + std::vector requests(_send_multi_blocks ? q.size() : 1); + for (int i = 0; i < requests.size(); i++) { + requests[i] = std::move(q.front()); + q.pop(); + + if (requests[i].block) { + // make sure rpc byte size under the _send_multi_blocks_bytes_size + mem_byte += requests[i].block->ByteSizeLong(); + if (_send_multi_blocks && mem_byte > _send_multi_blocks_byte_size) { + requests.resize(i + 1); + break; + } + } + } + // If we have data to shuffle which is not broadcasted - auto& request = q.front(); + auto& request = requests[0]; auto& brpc_request = instance_data.request; - brpc_request->set_eos(request.eos); - brpc_request->set_packet_seq(instance_data.seq++); brpc_request->set_sender_id(request.channel->_parent->sender_id()); brpc_request->set_be_number(request.channel->_parent->be_number()); - if (request.block && !request.block->column_metas().empty()) { - brpc_request->set_allocated_block(request.block.get()); + + if (_send_multi_blocks) { + for (auto& req : requests) { + if (req.block && !req.block->column_metas().empty()) { + auto add_block = brpc_request->add_blocks(); + add_block->Swap(req.block.get()); + } + } + } else { + if (request.block && !request.block->column_metas().empty()) { + brpc_request->set_allocated_block(request.block.get()); + } } - auto send_callback = request.channel->get_send_callback(&instance_data, request.eos); + instance_data.seq += requests.size(); + brpc_request->set_packet_seq(instance_data.seq); + brpc_request->set_eos(requests.back().eos); + auto send_callback = + request.channel->get_send_callback(&instance_data, requests.back().eos); send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms); if (config::execution_ignore_eovercrowded) { send_callback->cntl_->ignore_eovercrowded(); @@ -314,31 +371,74 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) { std::move(send_remote_block_closure)); } } - if (request.block) { - COUNTER_UPDATE(request.channel->_parent->memory_used_counter(), - -request.block->ByteSizeLong()); + + if (!_send_multi_blocks && request.block) { static_cast(brpc_request->release_block()); + } else { + brpc_request->clear_blocks(); } - q.pop(); - _total_queue_size--; + if (mem_byte) { + COUNTER_UPDATE(request.channel->_parent->memory_used_counter(), -mem_byte); + } + DCHECK_GE(_total_queue_size, requests.size()); + _total_queue_size -= (int)requests.size(); if (_total_queue_size <= _queue_capacity) { for (auto& dep : _queue_deps) { dep->set_ready(); } } - } else if (!broadcast_q.empty()) { + } else if (broadcast_q_ptr && !broadcast_q_ptr->empty()) { + auto& broadcast_q = *broadcast_q_ptr; // If we have data to shuffle which is broadcasted - auto& request = broadcast_q.front(); + std::vector requests(_send_multi_blocks ? broadcast_q.size() : 1); + for (int i = 0; i < requests.size(); i++) { + requests[i] = broadcast_q.front(); + broadcast_q.pop(); + + if (requests[i].block_holder->get_block()) { + // make sure rpc byte size under the _send_multi_blocks_bytes_size + mem_byte += requests[i].block_holder->get_block()->ByteSizeLong(); + if (_send_multi_blocks && mem_byte > _send_multi_blocks_byte_size) { + requests.resize(i + 1); + break; + } + } + } + + auto& request = requests[0]; auto& brpc_request = instance_data.request; - brpc_request->set_eos(request.eos); - brpc_request->set_packet_seq(instance_data.seq++); brpc_request->set_sender_id(request.channel->_parent->sender_id()); brpc_request->set_be_number(request.channel->_parent->be_number()); - if (request.block_holder->get_block() && - !request.block_holder->get_block()->column_metas().empty()) { - brpc_request->set_allocated_block(request.block_holder->get_block()); + + if (_send_multi_blocks) { + for (int i = 0; i < requests.size(); i++) { + auto& req = requests[i]; + if (auto block = req.block_holder->get_block(); + block && !block->column_metas().empty()) { + auto add_block = brpc_request->add_blocks(); + for (int j = 0; j < block->column_metas_size(); ++j) { + add_block->add_column_metas()->CopyFrom(block->column_metas(j)); + } + add_block->set_be_exec_version(block->be_exec_version()); + add_block->set_compressed(block->compressed()); + add_block->set_compression_type(block->compression_type()); + add_block->set_uncompressed_size(block->uncompressed_size()); + add_block->set_allocated_column_values( + const_cast(&block->column_values())); + } + } + } else { + if (request.block_holder->get_block() && + !request.block_holder->get_block()->column_metas().empty()) { + brpc_request->set_allocated_block(request.block_holder->get_block()); + } } - auto send_callback = request.channel->get_send_callback(&instance_data, request.eos); + instance_data.seq += requests.size(); + brpc_request->set_packet_seq(instance_data.seq); + brpc_request->set_eos(requests.back().eos); + auto send_callback = + request.channel->get_send_callback(&instance_data, requests.back().eos); + send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms); if (config::execution_ignore_eovercrowded) { send_callback->cntl_->ignore_eovercrowded(); @@ -403,10 +503,14 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) { std::move(send_remote_block_closure)); } } - if (request.block_holder->get_block()) { + if (!_send_multi_blocks && request.block_holder->get_block()) { static_cast(brpc_request->release_block()); + } else { + for (int i = 0; i < brpc_request->mutable_blocks()->size(); ++i) { + static_cast(brpc_request->mutable_blocks(i)->release_column_values()); + } + brpc_request->clear_blocks(); } - broadcast_q.pop(); } else { instance_data.rpc_channel_is_idle = true; } @@ -436,27 +540,27 @@ void ExchangeSinkBuffer::_set_receiver_eof(RpcInstance& ins) { // and the rpc_channel should be turned off immediately. Defer turn_off([&]() { _turn_off_channel(ins, lock); }); - std::queue>& broadcast_q = - ins.broadcast_package_queue; - for (; !broadcast_q.empty(); broadcast_q.pop()) { - if (broadcast_q.front().block_holder->get_block()) { - COUNTER_UPDATE(broadcast_q.front().channel->_parent->memory_used_counter(), - -broadcast_q.front().block_holder->get_block()->ByteSizeLong()); + auto& broadcast_q_map = ins.broadcast_package_queue; + for (auto& [_, broadcast_q] : broadcast_q_map) { + for (; !broadcast_q.empty(); broadcast_q.pop()) { + if (broadcast_q.front().block_holder->get_block()) { + COUNTER_UPDATE(broadcast_q.front().channel->_parent->memory_used_counter(), + -broadcast_q.front().block_holder->get_block()->ByteSizeLong()); + } } } - { - std::queue> empty; - swap(empty, broadcast_q); - } - - std::queue>& q = ins.package_queue; - for (; !q.empty(); q.pop()) { - // Must update _total_queue_size here, otherwise if _total_queue_size > _queue_capacity at EOF, - // ExchangeSinkQueueDependency will be blocked and pipeline will be deadlocked - _total_queue_size--; - if (q.front().block) { - COUNTER_UPDATE(q.front().channel->_parent->memory_used_counter(), - -q.front().block->ByteSizeLong()); + broadcast_q_map.clear(); + + auto& q_map = ins.package_queue; + for (auto& [_, q] : q_map) { + for (; !q.empty(); q.pop()) { + // Must update _total_queue_size here, otherwise if _total_queue_size > _queue_capacity at EOF, + // ExchangeSinkQueueDependency will be blocked and pipeline will be deadlocked + _total_queue_size--; + if (q.front().block) { + COUNTER_UPDATE(q.front().channel->_parent->memory_used_counter(), + -q.front().block->ByteSizeLong()); + } } } @@ -467,10 +571,7 @@ void ExchangeSinkBuffer::_set_receiver_eof(RpcInstance& ins) { } } - { - std::queue> empty; - swap(empty, q); - } + q_map.clear(); } // The unused parameter `with_lock` is to ensure that the function is called when the lock is held. @@ -582,8 +683,11 @@ std::string ExchangeSinkBuffer::debug_each_instance_queue_size() { fmt::memory_buffer debug_string_buffer; for (auto& [id, instance_data] : _rpc_instances) { std::unique_lock lock(*instance_data->mutex); - fmt::format_to(debug_string_buffer, "Instance: {}, queue size: {}\n", id, - instance_data->package_queue.size()); + auto queue_size = 0; + for (auto& [_, list] : instance_data->package_queue) { + queue_size += list.size(); + } + fmt::format_to(debug_string_buffer, "Instance: {}, queue size: {}\n", id, queue_size); } return fmt::to_string(debug_string_buffer); } diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index 9a00ef072d548c..1489ea45d1d13a 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -144,10 +144,13 @@ struct RpcInstance { int64_t seq = 0; // Queue for regular data transmission requests - std::queue> package_queue; + std::unordered_map>> + package_queue; // Queue for broadcast data transmission requests - std::queue> broadcast_package_queue; + std::unordered_map>> + broadcast_package_queue; // RPC request parameters for data transmission std::shared_ptr request; @@ -341,6 +344,8 @@ class ExchangeSinkBuffer : public HasTaskExecutionCtx { // The ExchangeSinkLocalState in _parents is only used in _turn_off_channel. std::vector _parents; const int64_t _exchange_sink_num; + bool _send_multi_blocks = false; + int _send_multi_blocks_byte_size = 256 * 1024; }; } // namespace pipeline diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp b/be/src/vec/runtime/vdata_stream_mgr.cpp index 535f59c49e8fd4..2a4f4e22861beb 100644 --- a/be/src/vec/runtime/vdata_stream_mgr.cpp +++ b/be/src/vec/runtime/vdata_stream_mgr.cpp @@ -146,6 +146,18 @@ Status VDataStreamMgr::transmit_block(const PTransmitDataParams* request, } bool eos = request->eos(); + if (!request->blocks().empty()) { + for (int i = 0; i < request->blocks_size(); i++) { + std::unique_ptr pblock_ptr = std::make_unique(); + pblock_ptr->Swap(const_cast(&request->blocks(i))); + RETURN_IF_ERROR(recvr->add_block( + std::move(pblock_ptr), request->sender_id(), request->be_number(), + request->packet_seq() - request->blocks_size() + i, eos ? nullptr : done, + wait_for_worker, cpu_time_stop_watch.elapsed_time())); + } + } + + // old logic, for compatibility if (request->has_block()) { std::unique_ptr pblock_ptr { const_cast(request)->release_block()}; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index c7d26af924fda9..5eff419c1a4959 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -674,6 +674,8 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_ES_PARALLEL_SCROLL = "enable_es_parallel_scroll"; + public static final String EXCHANGE_MULTI_BLOCKS_BYTE_SIZE = "exchange_multi_blocks_byte_size"; + public static final List DEBUG_VARIABLES = ImmutableList.of( SKIP_DELETE_PREDICATE, SKIP_DELETE_BITMAP, @@ -2243,6 +2245,9 @@ public boolean isEnableHboNonStrictMatchingMode() { "When processing both \\n and \\r\\n as CSV line separators, should \\r be retained?"}) public boolean keepCarriageReturn = false; + @VariableMgr.VarAttr(name = EXCHANGE_MULTI_BLOCKS_BYTE_SIZE, + description = {"enable exchange send multi blocks in one rpc, default is 256KB. negatives means disable"}) + public int exchangeMultiBlocksByteSize = 256 * 1024; @VariableMgr.VarAttr(name = FORCE_JNI_SCANNER, description = {"强制使用jni方式读取外表", "Force the use of jni mode to read external table"}) @@ -4165,6 +4170,7 @@ public TQueryOptions toThrift() { tResult.setEnableRuntimeFilterPartitionPrune(enableRuntimeFilterPartitionPrune); tResult.setMinimumOperatorMemoryRequiredKb(minimumOperatorMemoryRequiredKB); + tResult.setExchangeMultiBlocksByteSize(exchangeMultiBlocksByteSize); return tResult; } diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 402f1044568454..e36907e9ea1a3e 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -51,6 +51,7 @@ message PTransmitDataParams { optional bool transfer_by_attachment = 10 [default = false]; optional PUniqueId query_id = 11; optional PStatus exec_status = 12; + repeated PBlock blocks = 13; }; message PTransmitDataResult { diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 60ca569949feaf..81e4d1f877cb2e 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -393,6 +393,7 @@ struct TQueryOptions { 162: optional bool dump_heap_profile_when_mem_limit_exceeded = false 163: optional bool inverted_index_compatible_read = false 164: optional bool check_orc_init_sargs_success = false + 165: optional i32 exchange_multi_blocks_byte_size = 262144 // For cloud, to control if the content would be written into file cache // In write path, to control if the content would be written into file cache. From d939ce048b327db08fd63e081be9a3ea78d1cb10 Mon Sep 17 00:00:00 2001 From: HappenLee Date: Thu, 17 Apr 2025 23:33:28 +0800 Subject: [PATCH 2/5] add debug log for taikang --- .../src/main/java/org/apache/doris/qe/Coordinator.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 8d34183f3cf6a4..f422c859170ac4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -2155,6 +2155,11 @@ protected void computeScanRangeAssignment() throws Exception { replicaNumPerHost, isEnableOrderedLocations); } } + + if (LOG.isDebugEnabled()) { + LOG.debug("query id:{}, per backend replica:{}", DebugUtil.printId(queryId), + assignedBytesPerHost.toString()); + } } // To ensure the same bucketSeq tablet to the same execHostPort From b698500022c94ff3157352b7d01bd0dcbae4c50f Mon Sep 17 00:00:00 2001 From: HappenLee Date: Wed, 23 Apr 2025 15:01:23 +0800 Subject: [PATCH 3/5] remove unless channel ptr in trans message --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 72 +++++++++---------- be/src/pipeline/exec/exchange_sink_buffer.h | 6 +- be/src/vec/sink/vdata_stream_sender.cpp | 4 +- be/test/vec/exec/exchange_sink_test.h | 5 +- 4 files changed, 42 insertions(+), 45 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 1285b2c2612fa9..fc28f8b115c6ce 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -155,14 +155,14 @@ void ExchangeSinkBuffer::construct_request(TUniqueId fragment_instance_id) { _rpc_instances[low_id] = std::move(instance_data); } -Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) { +Status ExchangeSinkBuffer::add_block(vectorized::Channel* channel, TransmitInfo&& request) { if (_is_failed) { return Status::OK(); } - auto ins_id = request.channel->dest_ins_id(); + auto ins_id = channel->dest_ins_id(); if (!_rpc_instances.contains(ins_id)) { return Status::InternalError("fragment_instance_id {} not do register_sink", - print_id(request.channel->_fragment_instance_id)); + print_id(channel->_fragment_instance_id)); } auto& instance_data = *_rpc_instances[ins_id]; if (instance_data.rpc_channel_is_turn_off) { @@ -179,10 +179,9 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) { if (request.block) { RETURN_IF_ERROR( BeExecVersionManager::check_be_exec_version(request.block->be_exec_version())); - COUNTER_UPDATE(request.channel->_parent->memory_used_counter(), - request.block->ByteSizeLong()); + COUNTER_UPDATE(channel->_parent->memory_used_counter(), request.block->ByteSizeLong()); } - instance_data.package_queue[request.channel].emplace(std::move(request)); + instance_data.package_queue[channel].emplace(std::move(request)); _total_queue_size++; if (_total_queue_size > _queue_capacity) { for (auto& dep : _queue_deps) { @@ -197,14 +196,15 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) { return Status::OK(); } -Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) { +Status ExchangeSinkBuffer::add_block(vectorized::Channel* channel, + BroadcastTransmitInfo&& request) { if (_is_failed) { return Status::OK(); } - auto ins_id = request.channel->dest_ins_id(); + auto ins_id = channel->dest_ins_id(); if (!_rpc_instances.contains(ins_id)) { return Status::InternalError("fragment_instance_id {} not do register_sink", - print_id(request.channel->_fragment_instance_id)); + print_id(channel->_fragment_instance_id)); } auto& instance_data = *_rpc_instances[ins_id]; if (instance_data.rpc_channel_is_turn_off) { @@ -222,7 +222,7 @@ Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) { RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version( request.block_holder->get_block()->be_exec_version())); } - instance_data.broadcast_package_queue[request.channel].emplace(request); + instance_data.broadcast_package_queue[channel].emplace(request); } if (send_now) { RETURN_IF_ERROR(_send_rpc(instance_data)); @@ -237,24 +237,28 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) { auto& q_map = instance_data.package_queue; auto& broadcast_q_map = instance_data.broadcast_package_queue; - auto find_max_size_queue = [](auto& ptr, auto& map) { - for (auto& [_, lists] : map) { + auto find_max_size_queue = [](vectorized::Channel*& channel, auto& ptr, auto& map) { + for (auto& [chan, lists] : map) { if (!ptr) { if (!lists.empty()) { + channel = chan; ptr = &lists; } } else { if (ptr->size() < lists.size()) { + channel = chan; ptr = &lists; } } } }; - std::queue>* q_ptr = nullptr; - find_max_size_queue(q_ptr, q_map); + vectorized::Channel* channel = nullptr; + + std::queue>* q_ptr = nullptr; + find_max_size_queue(channel, q_ptr, q_map); std::queue>* broadcast_q_ptr = nullptr; - find_max_size_queue(broadcast_q_ptr, broadcast_q_map); + find_max_size_queue(channel, broadcast_q_ptr, broadcast_q_map); if (_is_failed) { _turn_off_channel(instance_data, lock); @@ -286,8 +290,8 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) { // If we have data to shuffle which is not broadcasted auto& request = requests[0]; auto& brpc_request = instance_data.request; - brpc_request->set_sender_id(request.channel->_parent->sender_id()); - brpc_request->set_be_number(request.channel->_parent->be_number()); + brpc_request->set_sender_id(channel->_parent->sender_id()); + brpc_request->set_be_number(channel->_parent->be_number()); if (_send_multi_blocks) { for (auto& req : requests) { @@ -305,9 +309,8 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) { instance_data.seq += requests.size(); brpc_request->set_packet_seq(instance_data.seq); brpc_request->set_eos(requests.back().eos); - auto send_callback = - request.channel->get_send_callback(&instance_data, requests.back().eos); - send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms); + auto send_callback = channel->get_send_callback(&instance_data, requests.back().eos); + send_callback->cntl_->set_timeout_ms(channel->_brpc_timeout_ms); if (config::execution_ignore_eovercrowded) { send_callback->cntl_->ignore_eovercrowded(); } @@ -365,10 +368,9 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) { if (enable_http_send_block(*brpc_request)) { RETURN_IF_ERROR(transmit_block_httpv2(_context->exec_env(), std::move(send_remote_block_closure), - request.channel->_brpc_dest_addr)); + channel->_brpc_dest_addr)); } else { - transmit_blockv2(*request.channel->_brpc_stub, - std::move(send_remote_block_closure)); + transmit_blockv2(*channel->_brpc_stub, std::move(send_remote_block_closure)); } } @@ -378,7 +380,7 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) { brpc_request->clear_blocks(); } if (mem_byte) { - COUNTER_UPDATE(request.channel->_parent->memory_used_counter(), -mem_byte); + COUNTER_UPDATE(channel->_parent->memory_used_counter(), -mem_byte); } DCHECK_GE(_total_queue_size, requests.size()); _total_queue_size -= (int)requests.size(); @@ -407,8 +409,8 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) { auto& request = requests[0]; auto& brpc_request = instance_data.request; - brpc_request->set_sender_id(request.channel->_parent->sender_id()); - brpc_request->set_be_number(request.channel->_parent->be_number()); + brpc_request->set_sender_id(channel->_parent->sender_id()); + brpc_request->set_be_number(channel->_parent->be_number()); if (_send_multi_blocks) { for (int i = 0; i < requests.size(); i++) { @@ -436,10 +438,9 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) { instance_data.seq += requests.size(); brpc_request->set_packet_seq(instance_data.seq); brpc_request->set_eos(requests.back().eos); - auto send_callback = - request.channel->get_send_callback(&instance_data, requests.back().eos); + auto send_callback = channel->get_send_callback(&instance_data, requests.back().eos); - send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms); + send_callback->cntl_->set_timeout_ms(channel->_brpc_timeout_ms); if (config::execution_ignore_eovercrowded) { send_callback->cntl_->ignore_eovercrowded(); } @@ -497,10 +498,9 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) { if (enable_http_send_block(*brpc_request)) { RETURN_IF_ERROR(transmit_block_httpv2(_context->exec_env(), std::move(send_remote_block_closure), - request.channel->_brpc_dest_addr)); + channel->_brpc_dest_addr)); } else { - transmit_blockv2(*request.channel->_brpc_stub, - std::move(send_remote_block_closure)); + transmit_blockv2(*channel->_brpc_stub, std::move(send_remote_block_closure)); } } if (!_send_multi_blocks && request.block_holder->get_block()) { @@ -541,10 +541,10 @@ void ExchangeSinkBuffer::_set_receiver_eof(RpcInstance& ins) { Defer turn_off([&]() { _turn_off_channel(ins, lock); }); auto& broadcast_q_map = ins.broadcast_package_queue; - for (auto& [_, broadcast_q] : broadcast_q_map) { + for (auto& [channel, broadcast_q] : broadcast_q_map) { for (; !broadcast_q.empty(); broadcast_q.pop()) { if (broadcast_q.front().block_holder->get_block()) { - COUNTER_UPDATE(broadcast_q.front().channel->_parent->memory_used_counter(), + COUNTER_UPDATE(channel->_parent->memory_used_counter(), -broadcast_q.front().block_holder->get_block()->ByteSizeLong()); } } @@ -552,13 +552,13 @@ void ExchangeSinkBuffer::_set_receiver_eof(RpcInstance& ins) { broadcast_q_map.clear(); auto& q_map = ins.package_queue; - for (auto& [_, q] : q_map) { + for (auto& [channel, q] : q_map) { for (; !q.empty(); q.pop()) { // Must update _total_queue_size here, otherwise if _total_queue_size > _queue_capacity at EOF, // ExchangeSinkQueueDependency will be blocked and pipeline will be deadlocked _total_queue_size--; if (q.front().block) { - COUNTER_UPDATE(q.front().channel->_parent->memory_used_counter(), + COUNTER_UPDATE(channel->_parent->memory_used_counter(), -q.front().block->ByteSizeLong()); } } diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index 1489ea45d1d13a..44416ef68e1b0a 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -111,13 +111,11 @@ class BroadcastPBlockHolderMemLimiter namespace pipeline { struct TransmitInfo { - vectorized::Channel* channel = nullptr; std::unique_ptr block; bool eos; }; struct BroadcastTransmitInfo { - vectorized::Channel* channel = nullptr; std::shared_ptr block_holder = nullptr; bool eos; }; @@ -278,8 +276,8 @@ class ExchangeSinkBuffer : public HasTaskExecutionCtx { void construct_request(TUniqueId); - Status add_block(TransmitInfo&& request); - Status add_block(BroadcastTransmitInfo&& request); + Status add_block(vectorized::Channel* channel, TransmitInfo&& request); + Status add_block(vectorized::Channel* channel, BroadcastTransmitInfo&& request); void close(); void update_rpc_time(RpcInstance& ins, int64_t start_rpc_time, int64_t receive_rpc_time); void update_profile(RuntimeProfile* profile); diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 3f09d02f20f577..0e9c5371a2bf67 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -174,7 +174,7 @@ Status Channel::send_remote_block(std::unique_ptr&& block, bool eos) { } } if (eos || block->column_metas_size()) { - RETURN_IF_ERROR(_buffer->add_block({this, std::move(block), eos})); + RETURN_IF_ERROR(_buffer->add_block(this, {std::move(block), eos})); } return Status::OK(); } @@ -188,7 +188,7 @@ Status Channel::send_broadcast_block(std::shared_ptr& blo _eos_send = true; } if (eos || block->get_block()->column_metas_size()) { - RETURN_IF_ERROR(_buffer->add_block({this, block, eos})); + RETURN_IF_ERROR(_buffer->add_block(this, {block, eos})); } return Status::OK(); } diff --git a/be/test/vec/exec/exchange_sink_test.h b/be/test/vec/exec/exchange_sink_test.h index 59004a53a9e693..5cf0fdbed6a030 100644 --- a/be/test/vec/exec/exchange_sink_test.h +++ b/be/test/vec/exec/exchange_sink_test.h @@ -138,9 +138,8 @@ struct SinkWithChannel { std::map> channels; Status add_block(int64_t id, bool eos) { auto channel = channels[id]; - TransmitInfo transmitInfo { - .channel = channel.get(), .block = std::make_unique(), .eos = eos}; - return buffer->add_block(std::move(transmitInfo)); + TransmitInfo transmitInfo {.block = std::make_unique(), .eos = eos}; + return buffer->add_block(channel.get(), std::move(transmitInfo)); } }; From 746636cb6057f0f670eaf22666b7b0f55f3468a4 Mon Sep 17 00:00:00 2001 From: HappenLee Date: Wed, 23 Apr 2025 15:01:57 +0800 Subject: [PATCH 4/5] Revert "add debug log for taikang" This reverts commit efed870d5139d82cd7ee1a28b41d83b438c06b05. --- .../src/main/java/org/apache/doris/qe/Coordinator.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index f422c859170ac4..8d34183f3cf6a4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -2155,11 +2155,6 @@ protected void computeScanRangeAssignment() throws Exception { replicaNumPerHost, isEnableOrderedLocations); } } - - if (LOG.isDebugEnabled()) { - LOG.debug("query id:{}, per backend replica:{}", DebugUtil.printId(queryId), - assignedBytesPerHost.toString()); - } } // To ensure the same bucketSeq tablet to the same execHostPort From 9dd7eb79da2ec326feacad5d62559a520cff770e Mon Sep 17 00:00:00 2001 From: HappenLee Date: Sun, 27 Apr 2025 23:49:51 +0800 Subject: [PATCH 5/5] change comment --- .../main/java/org/apache/doris/qe/SessionVariable.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 5eff419c1a4959..67e52b661337be 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -2246,7 +2246,8 @@ public boolean isEnableHboNonStrictMatchingMode() { public boolean keepCarriageReturn = false; @VariableMgr.VarAttr(name = EXCHANGE_MULTI_BLOCKS_BYTE_SIZE, - description = {"enable exchange send multi blocks in one rpc, default is 256KB. negatives means disable"}) + description = {"Enable exchange to send multiple blocks in one RPC. Default is 256KB. A negative" + + " value disables multi-block exchange."}) public int exchangeMultiBlocksByteSize = 256 * 1024; @VariableMgr.VarAttr(name = FORCE_JNI_SCANNER, @@ -2596,6 +2597,12 @@ public void initFuzzyModeVariables() { this.disableStreamPreaggregations = random.nextBoolean(); this.enableShareHashTableForBroadcastJoin = random.nextBoolean(); this.enableParallelResultSink = random.nextBoolean(); + + // 4KB = 4 * 1024 bytes + int minBytes = 4 * 1024; + // 10MB = 10 * 1024 * 1024 bytes + int maxBytes = 10 * 1024 * 1024; + this.exchangeMultiBlocksByteSize = minBytes + (int) (random.nextDouble() * (maxBytes - minBytes)); int randomInt = random.nextInt(4); if (randomInt % 2 == 0) { this.rewriteOrToInPredicateThreshold = 100000;