From 5fec3f90bb748c26e9173a8f66a569e5344ce198 Mon Sep 17 00:00:00 2001 From: Mryange <2319153948@qq.com> Date: Tue, 19 Dec 2023 11:42:22 +0800 Subject: [PATCH] upd --- be/src/vec/runtime/vdata_stream_recvr.cpp | 41 ++++++++++--------- be/src/vec/runtime/vdata_stream_recvr.h | 50 ++++++++++++++++++++++- 2 files changed, 71 insertions(+), 20 deletions(-) diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index ca0da254e98310..56f6c51e68490d 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -100,7 +100,8 @@ Status VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block auto [next_block, block_byte_size] = std::move(_block_queue.front()); _recvr->update_blocks_memory_usage(-block_byte_size); _block_queue.pop_front(); - if (_block_queue.size() == 0 && _dependency) { + _record_debug_info(); + if (_block_queue.empty() && _dependency) { if (!_is_cancelled && _num_remaining_senders > 0) { _dependency->block(); } @@ -122,6 +123,16 @@ Status VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block return Status::OK(); } +void VDataStreamRecvr::SenderQueue::try_set_dep_ready_without_lock() { + if (!_dependency) { + return; + } + const bool should_wait = !_is_cancelled && _block_queue.empty() && _num_remaining_senders > 0; + if (!should_wait) { + _dependency->set_ready(); + } +} + Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_number, int64_t packet_seq, ::google::protobuf::Closure** done) { @@ -178,9 +189,8 @@ Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_num if (!empty) { _block_queue.emplace_back(std::move(block), block_byte_size); - if (_dependency) { - _dependency->set_ready(); - } + _record_debug_info(); + try_set_dep_ready_without_lock(); } // if done is nullptr, this function can't delay this response if (done != nullptr && _recvr->exceeds_limit(block_byte_size)) { @@ -234,9 +244,8 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) { if (!empty) { _block_queue.emplace_back(std::move(nblock), block_mem_size); - if (_dependency) { - _dependency->set_ready(); - } + _record_debug_info(); + try_set_dep_ready_without_lock(); _data_arrival_cv.notify_one(); } @@ -269,13 +278,12 @@ void VDataStreamRecvr::SenderQueue::decrement_senders(int be_number) { _sender_eos_set.insert(be_number); DCHECK_GT(_num_remaining_senders, 0); _num_remaining_senders--; + _record_debug_info(); VLOG_FILE << "decremented senders: fragment_instance_id=" << print_id(_recvr->fragment_instance_id()) << " node_id=" << _recvr->dest_node_id() << " #senders=" << _num_remaining_senders; if (_num_remaining_senders == 0) { - if (_dependency) { - _dependency->set_ready(); - } + try_set_dep_ready_without_lock(); _data_arrival_cv.notify_one(); } } @@ -288,9 +296,7 @@ void VDataStreamRecvr::SenderQueue::cancel(Status cancel_status) { } _is_cancelled = true; _cancel_status = cancel_status; - if (_dependency) { - _dependency->set_ready(); - } + try_set_dep_ready_without_lock(); VLOG_QUERY << "cancelled stream: _fragment_instance_id=" << print_id(_recvr->fragment_instance_id()) << " node_id=" << _recvr->dest_node_id(); @@ -318,9 +324,7 @@ void VDataStreamRecvr::SenderQueue::close() { // is clear will be memory leak std::lock_guard l(_lock); _is_cancelled = true; - if (_dependency) { - _dependency->set_ready(); - } + try_set_dep_ready_without_lock(); for (auto closure_pair : _pending_closures) { closure_pair.first->Run(); @@ -561,9 +565,8 @@ void VDataStreamRecvr::PipSenderQueue::add_block(Block* block, bool use_move) { return; } _block_queue.emplace_back(std::move(nblock), block_mem_size); - if (_dependency) { - _dependency->set_ready(); - } + _record_debug_info(); + try_set_dep_ready_without_lock(); COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_mem_size); _recvr->update_blocks_memory_usage(block_mem_size); _data_arrival_cv.notify_one(); diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index a09d86507d1216..e0b63459ad2d5e 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -30,6 +30,7 @@ #include #include #include +#include #include #include #include @@ -240,6 +241,52 @@ class VDataStreamRecvr::SenderQueue { friend struct pipeline::ExchangeDataDependency; Status _inner_get_batch_without_lock(Block* block, bool* eos); + void try_set_dep_ready_without_lock(); + + // To record information about several variables in the event of a DCHECK failure. + // DCHECK(_is_cancelled || !_block_queue.empty() || _num_remaining_senders == 0) +#ifndef NDEBUG + constexpr static auto max_record_number = 128; + std::list _record_block_queue; + std::list _record_num_remaining_senders; +#else +#endif + + // only in debug + ALWAYS_INLINE inline void _record_debug_info() { +#ifndef NDEBUG + if (_record_block_queue.size() > max_record_number) { + _record_block_queue.pop_front(); + } + if (_record_num_remaining_senders.size() > max_record_number) { + _record_num_remaining_senders.pop_front(); + } + _record_block_queue.push_back(_block_queue.size()); + _record_num_remaining_senders.push_back(_num_remaining_senders); +#else +#endif + } + + ALWAYS_INLINE inline std::string _debug_string_info() { +#ifndef NDEBUG + std::stringstream out; + DCHECK_EQ(_record_block_queue.size(), _record_num_remaining_senders.size()); + out << "record_debug_info [ \n"; + + auto it1 = _record_block_queue.begin(); + auto it2 = _record_num_remaining_senders.begin(); + for (; it1 != _record_block_queue.end(); it1++, it2++) { + out << "( " + << "_block_queue size : " << *it1 << " , _num_remaining_senders : " << *it2 + << " ) \n"; + } + out << " ]\n"; + return out.str(); +#else +#endif + return ""; + } + // Not managed by this class VDataStreamRecvr* _recvr = nullptr; std::mutex _lock; @@ -272,7 +319,8 @@ class VDataStreamRecvr::PipSenderQueue : public SenderQueue { DCHECK(_is_cancelled || !_block_queue.empty() || _num_remaining_senders == 0) << " _is_cancelled: " << _is_cancelled << ", _block_queue_empty: " << _block_queue.empty() - << ", _num_remaining_senders: " << _num_remaining_senders; + << ", _num_remaining_senders: " << _num_remaining_senders << "\n" + << _debug_string_info(); return _inner_get_batch_without_lock(block, eos); }