Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions be/src/vec/runtime/vdata_stream_recvr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ Status VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block

DCHECK(!_block_queue.empty());
auto [next_block, block_byte_size] = std::move(_block_queue.front());
_recvr->_blocks_memory_usage->add(-block_byte_size);
_recvr->update_blocks_memory_usage(-block_byte_size);
_block_queue.pop_front();

if (!_pending_closures.empty()) {
Expand Down Expand Up @@ -173,7 +173,7 @@ Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_num
_pending_closures.emplace_back(*done, monotonicStopWatch);
*done = nullptr;
}
_recvr->_blocks_memory_usage->add(block_byte_size);
_recvr->update_blocks_memory_usage(block_byte_size);
if (!empty) {
_data_arrival_cv.notify_one();
}
Expand Down Expand Up @@ -220,7 +220,12 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) {
_data_arrival_cv.notify_one();
}

if (_recvr->exceeds_limit(block_mem_size)) {
// Careful: Accessing members of _recvr that are allocated by Object pool
// should be done before the following logic, because the _lock will be released
// by `iter->second->wait(l)`, after `iter->second->wait(l)` returns, _recvr may
// have been closed and resouces in _recvr are released;
_recvr->update_blocks_memory_usage(block_mem_size);
if (_recvr->exceeds_limit(0)) {
// yiguolei
// It is too tricky here, if the running thread is bthread then the tid may be wrong.
std::thread::id tid = std::this_thread::get_id();
Expand All @@ -234,8 +239,6 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) {
_pending_closures.emplace_back(iter->second.get(), monotonicStopWatch);
iter->second->wait(l);
}

_recvr->_blocks_memory_usage->add(block_mem_size);
}

void VDataStreamRecvr::SenderQueue::decrement_senders(int be_number) {
Expand Down
12 changes: 10 additions & 2 deletions be/src/vec/runtime/vdata_stream_recvr.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,21 @@ class VDataStreamRecvr {

void close();

// Careful: stream sender will call this function for a local receiver,
// accessing members of receiver that are allocated by Object pool
// in this function is not safe.
bool exceeds_limit(int batch_size) {
return _blocks_memory_usage->current_value() + batch_size >
return _blocks_memory_usage_current_value + batch_size >
config::exchg_node_buffer_size_bytes;
}

bool is_closed() const { return _is_closed; }

private:
void update_blocks_memory_usage(int64_t size) {
_blocks_memory_usage->add(size);
_blocks_memory_usage_current_value = _blocks_memory_usage->current_value();
}
class SenderQueue;
class PipSenderQueue;

Expand Down Expand Up @@ -154,6 +161,7 @@ class VDataStreamRecvr {
RuntimeProfile::Counter* _decompress_bytes;
RuntimeProfile::Counter* _memory_usage_counter;
RuntimeProfile::HighWaterMarkCounter* _blocks_memory_usage;
std::atomic<int64_t> _blocks_memory_usage_current_value = 0;
RuntimeProfile::Counter* _peak_memory_usage_counter;

// Number of rows received
Expand Down Expand Up @@ -268,7 +276,7 @@ class VDataStreamRecvr::PipSenderQueue : public SenderQueue {
}
_block_queue.emplace_back(std::move(nblock), block_mem_size);
COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_mem_size);
_recvr->_blocks_memory_usage->add(block_mem_size);
_recvr->update_blocks_memory_usage(block_mem_size);
_data_arrival_cv.notify_one();
}
}
Expand Down