From 61e0eefb66a976a5b87ca4325ad31c525dde51a3 Mon Sep 17 00:00:00 2001 From: HappenLee Date: Thu, 23 Jul 2020 11:23:34 +0800 Subject: [PATCH] [Spill To Disk][1/6] The adjustment of the basic BufferedBlockMgr includes the following change 1. Add Exec msg for BufferedBlockMgr for debug tuning 2. Change the API of Consume Memory? We will use it in HashTable in the future 3. Fix mistake of count _unfullfilled_reserved_buffers in BufferedBlockMgr issue:#3926 --- be/src/runtime/buffered_block_mgr2.cc | 34 +++++++++++++++++++-------- be/src/runtime/buffered_block_mgr2.h | 6 +++++ 2 files changed, 30 insertions(+), 10 deletions(-) diff --git a/be/src/runtime/buffered_block_mgr2.cc b/be/src/runtime/buffered_block_mgr2.cc index 18f5b6383d07bf..d86a39b8c71925 100644 --- a/be/src/runtime/buffered_block_mgr2.cc +++ b/be/src/runtime/buffered_block_mgr2.cc @@ -15,8 +15,8 @@ // specific language governing permissions and limitations // under the License. +#include "exec/exec_node.h" #include "runtime/buffered_block_mgr2.h" - #include "runtime/exec_env.h" #include "runtime/runtime_state.h" #include "runtime/mem_tracker.h" @@ -219,7 +219,8 @@ BufferedBlockMgr2::BufferedBlockMgr2(RuntimeState* state, TmpFileMgr* tmp_file_m _non_local_outstanding_writes(0), _io_mgr(state->exec_env()->disk_io_mgr()), _is_cancelled(false), - _writes_issued(0) { + _writes_issued(0), + _state(state){ } Status BufferedBlockMgr2::create( @@ -313,6 +314,11 @@ bool BufferedBlockMgr2::try_acquire_tmp_reservation(Client* client, int num_buff } bool BufferedBlockMgr2::consume_memory(Client* client, int64_t size) { + // Later, we use this interface to manage the consumption of memory of hashtable instead of ReservationTracker. + // So it is possible to allocate 0, which has no additional impact on the behavior of BufferedBlockMgr. + // The process of memory allocation still by BufferPool, Because bufferpool has done a lot of optimization in memory allocation + // which is better than using the new operator directly. + if (size == 0) return true; // Workaround IMPALA-1619. Return immediately if the allocation size will cause // an arithmetic overflow. if (UNLIKELY(size >= (1LL << 31))) { @@ -321,7 +327,6 @@ bool BufferedBlockMgr2::consume_memory(Client* client, int64_t size) { return false; } int buffers_needed = BitUtil::ceil(size, max_block_size()); - DCHECK_GT(buffers_needed, 0) << "Trying to consume 0 memory"; unique_lock lock(_lock); if (size < max_block_size() && _mem_tracker->try_consume(size)) { @@ -331,7 +336,7 @@ bool BufferedBlockMgr2::consume_memory(Client* client, int64_t size) { return true; } - if (std::max(0, remaining_unreserved_buffers()) + + if (available_buffers(client) + client->_num_tmp_reserved_buffers < buffers_needed) { return false; } @@ -436,7 +441,15 @@ Status BufferedBlockMgr2::mem_limit_too_low_error(Client* client, int node_id) { << PrettyPrinter::print( client->_num_reserved_buffers * max_block_size(), TUnit::BYTES) << "."; - return Status::MemoryLimitExceeded(error_msg.str()); + return add_exec_msg(error_msg.str()); +} + +Status BufferedBlockMgr2::add_exec_msg(const std::string& msg) const { + stringstream str; + str << msg << " "; + str << "Backend: " << BackendOptions::get_localhost() << ", "; + str << "fragment: " << print_id(_state->fragment_instance_id()) << " "; + return Status::MemoryLimitExceeded(str.str()); } Status BufferedBlockMgr2::get_new_block( @@ -914,11 +927,12 @@ void BufferedBlockMgr2::delete_block(Block* block) { if (block->is_max_size()) { --_total_pinned_buffers; } - block->_is_pinned = false; block->_client->unpin_buffer(block->_buffer_desc); - if (block->_client->_num_pinned_buffers < block->_client->_num_reserved_buffers) { + // Only block is io size we need change _unfullfilled_reserved_buffers + if (block->is_max_size() && block->_client->_num_pinned_buffers < block->_client->_num_reserved_buffers) { ++_unfullfilled_reserved_buffers; } + block->_is_pinned = false; } else if (_unpinned_blocks.contains(block)) { // Remove block from unpinned list. _unpinned_blocks.remove(block); @@ -1034,7 +1048,7 @@ Status BufferedBlockMgr2::find_buffer_for_block(Block* block, bool* in_mem) { << endl << debug_internal() << endl << client->debug_string(); VLOG_QUERY << ss.str(); } - return Status::MemoryLimitExceeded("Query did not have enough memory to get the minimum required " + return add_exec_msg("Query did not have enough memory to get the minimum required " "buffers in the block manager."); } @@ -1089,8 +1103,8 @@ Status BufferedBlockMgr2::find_buffer( // There are no free buffers. If spills are disabled or there no unpinned blocks we // can write, return. We can't get a buffer. if (!_enable_spill) { - return Status::InternalError("Spilling has been disabled for plans," - "current memory usage has reached the bottleneck." + return add_exec_msg("Spilling has been disabled for plans," + "current memory usage has reached the bottleneck. " "You can avoid the behavior via increasing the mem limit " "by session variable exec_mem_limior or enable spilling."); } diff --git a/be/src/runtime/buffered_block_mgr2.h b/be/src/runtime/buffered_block_mgr2.h index 982fa07e7366f5..b2d355b1f63538 100644 --- a/be/src/runtime/buffered_block_mgr2.h +++ b/be/src/runtime/buffered_block_mgr2.h @@ -508,6 +508,9 @@ class BufferedBlockMgr2 { bool validate() const; std::string debug_internal() const; + // Add BE hostname and fragmentid for debug tuning + Status add_exec_msg(const std::string& msg) const; + // Size of the largest/default block in bytes. const int64_t _max_block_size; @@ -638,6 +641,9 @@ class BufferedBlockMgr2 { typedef boost::unordered_map > BlockMgrsMap; static BlockMgrsMap _s_query_to_block_mgrs; + // Unowned. + RuntimeState* _state; + }; // class BufferedBlockMgr2 } // end namespace doris