diff --git a/be/src/exec/aggregation_node.cpp b/be/src/exec/aggregation_node.cpp index d5c211dc7c69d0..5657d8fa1a68bd 100644 --- a/be/src/exec/aggregation_node.cpp +++ b/be/src/exec/aggregation_node.cpp @@ -199,7 +199,7 @@ Status AggregationNode::open(RuntimeState* state) { while (true) { bool eos = false; RETURN_IF_CANCELLED(state); - RETURN_IF_ERROR(state->check_query_state()); + RETURN_IF_ERROR(state->check_query_state("Aggregation, before getting next from child 0.")); RETURN_IF_ERROR(_children[0]->get_next(state, &batch, &eos)); // SCOPED_TIMER(_build_timer); if (VLOG_ROW_IS_ON) { @@ -227,7 +227,7 @@ Status AggregationNode::open(RuntimeState* state) { } // RETURN_IF_LIMIT_EXCEEDED(state); - RETURN_IF_ERROR(state->check_query_state()); + RETURN_IF_ERROR(state->check_query_state("Aggregation, after hashing the child 0.")); COUNTER_SET(_hash_table_buckets_counter, _hash_tbl->num_buckets()); COUNTER_SET(memory_used_counter(), @@ -238,7 +238,7 @@ Status AggregationNode::open(RuntimeState* state) { batch.reset(); - RETURN_IF_ERROR(state->check_query_state()); + RETURN_IF_ERROR(state->check_query_state("Aggregation, after setting the counter.")); if (eos) { break; } @@ -262,7 +262,7 @@ Status AggregationNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT)); RETURN_IF_CANCELLED(state); - RETURN_IF_ERROR(state->check_query_state()); + RETURN_IF_ERROR(state->check_query_state("Aggregation, before evaluating conjuncts.")); SCOPED_TIMER(_get_results_timer); if (reached_limit()) { @@ -280,7 +280,7 @@ Status AggregationNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* // maintenance every N iterations. if (count++ % N == 0) { RETURN_IF_CANCELLED(state); - RETURN_IF_ERROR(state->check_query_state()); + RETURN_IF_ERROR(state->check_query_state("Aggregation, while evaluating conjuncts.")); } int row_idx = row_batch->add_row(); TupleRow* row = row_batch->get_row(row_idx); diff --git a/be/src/exec/cross_join_node.cpp b/be/src/exec/cross_join_node.cpp index b390a65dddd376..3d6736d12bf3be 100644 --- a/be/src/exec/cross_join_node.cpp +++ b/be/src/exec/cross_join_node.cpp @@ -66,8 +66,7 @@ Status CrossJoinNode::construct_build_side(RuntimeState* state) { RETURN_IF_ERROR(child(1)->get_next(state, batch, &eos)); // to prevent use too many memory - RETURN_IF_LIMIT_EXCEEDED(state, - "Cross join was getting next from the child."); + RETURN_IF_LIMIT_EXCEEDED(state, "Cross join, while getting next from the child 1."); SCOPED_TIMER(_build_timer); _build_batches.add_row_batch(batch); diff --git a/be/src/exec/exchange_node.cpp b/be/src/exec/exchange_node.cpp index bad1f24917b5d0..b56af60555db18 100644 --- a/be/src/exec/exchange_node.cpp +++ b/be/src/exec/exchange_node.cpp @@ -151,8 +151,6 @@ Status ExchangeNode::get_next(RuntimeState* state, RowBatch* output_batch, bool* { SCOPED_TIMER(_convert_row_batch_timer); RETURN_IF_CANCELLED(state); - // RETURN_IF_ERROR(QueryMaintenance(state)); - RETURN_IF_ERROR(state->check_query_state()); // copy rows until we hit the limit/capacity or until we exhaust _input_batch while (!reached_limit() && !output_batch->at_capacity() && _input_batch != NULL && _next_row_idx < _input_batch->capacity()) { @@ -211,8 +209,7 @@ Status ExchangeNode::get_next(RuntimeState* state, RowBatch* output_batch, bool* Status ExchangeNode::get_next_merging(RuntimeState* state, RowBatch* output_batch, bool* eos) { DCHECK_EQ(output_batch->num_rows(), 0); RETURN_IF_CANCELLED(state); - // RETURN_IF_ERROR(QueryMaintenance(state)); - RETURN_IF_ERROR(state->check_query_state()); + RETURN_IF_ERROR(state->check_query_state("Exchange, while merging next.")); RETURN_IF_ERROR(_stream_recvr->get_next(output_batch, eos)); while ((_num_rows_skipped < _offset)) { diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index c8d6dca7847161..5facfa6c5c2c54 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -724,10 +724,10 @@ Status ExecNode::enable_deny_reservation_debug_action() { } */ -Status ExecNode::QueryMaintenance(RuntimeState* state) { +Status ExecNode::QueryMaintenance(RuntimeState* state, const std::string& msg) { // TODO chenhao , when introduce latest AnalyticEvalNode open it // ScalarExprEvaluator::FreeLocalAllocations(evals_to_free_); - return state->check_query_state(); + return state->check_query_state(msg); } } diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h index fe055b9c2f88d9..647ee8b1f07579 100644 --- a/be/src/exec/exec_node.h +++ b/be/src/exec/exec_node.h @@ -30,6 +30,7 @@ #include "util/blocking_queue.hpp" #include "runtime/bufferpool/buffer_pool.h" #include "runtime/query_statistics.h" +#include "service/backend_options.h" namespace llvm { class Function; @@ -387,22 +388,29 @@ class ExecNode { /// Nodes may override this to add extra periodic cleanup, e.g. freeing other local /// allocations. ExecNodes overriding this function should return /// ExecNode::QueryMaintenance(). - virtual Status QueryMaintenance(RuntimeState* state) WARN_UNUSED_RESULT; + virtual Status QueryMaintenance(RuntimeState* state, const std::string& msg) WARN_UNUSED_RESULT; private: bool _is_closed; }; +#define LIMIT_EXCEEDED(tracker, state, msg) \ + do { \ + stringstream str; \ + str << "Memory exceed limit. " << msg << " "; \ + str << "Backend: " << BackendOptions::get_localhost() << ", "; \ + str << "fragment: " << print_id(state->fragment_instance_id()) << " "; \ + str << "Used: " << tracker->consumption() << ", Limit: " << tracker->limit() << ". "; \ + str << "You can change the limit by session variable exec_mem_limit."; \ + return Status::MemoryLimitExceeded(str.str()); \ + } while (false) + #define RETURN_IF_LIMIT_EXCEEDED(state, msg) \ do { \ /* if (UNLIKELY(MemTracker::limit_exceeded(*(state)->mem_trackers()))) { */ \ MemTracker* tracker = state->instance_mem_tracker()->find_limit_exceeded_tracker(); \ if (tracker != nullptr) { \ - stringstream str; \ - str << "Memory exceed limit. " << msg << " "; \ - str << "Used: " << tracker->consumption() << ", Limit: " << tracker->limit() << ". "; \ - str << "You can change the limit by session variable exec_mem_limit."; \ - return Status::MemoryLimitExceeded(str.str()); \ + LIMIT_EXCEEDED(tracker, state, msg); \ } \ } while (false) } diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp index 0b471f7db47fd0..bfb1b3f0971d9b 100644 --- a/be/src/exec/hash_join_node.cpp +++ b/be/src/exec/hash_join_node.cpp @@ -232,8 +232,7 @@ Status HashJoinNode::construct_hash_table(RuntimeState* state) { SCOPED_TIMER(_build_timer); // take ownership of tuple data of build_batch _build_pool->acquire_data(build_batch.tuple_data_pool(), false); - RETURN_IF_LIMIT_EXCEEDED(state, - "Hash join was constructing the hash table."); + RETURN_IF_LIMIT_EXCEEDED(state, "Hash join, while constructing the hash table."); // Call codegen version if possible if (_process_build_batch_fn == NULL) { diff --git a/be/src/exec/new_partitioned_aggregation_node.cc b/be/src/exec/new_partitioned_aggregation_node.cc index 1cbd405b90bee2..9ea08261353aa2 100644 --- a/be/src/exec/new_partitioned_aggregation_node.cc +++ b/be/src/exec/new_partitioned_aggregation_node.cc @@ -305,7 +305,8 @@ Status NewPartitionedAggregationNode::open(RuntimeState* state) { bool eos = false; do { RETURN_IF_CANCELLED(state); - RETURN_IF_ERROR(QueryMaintenance(state)); + RETURN_IF_ERROR(state->check_query_state( + "New partitioned aggregation, while getting next from child 0.")); RETURN_IF_ERROR(_children[0]->get_next(state, &batch, &eos)); if (UNLIKELY(VLOG_ROW_IS_ON)) { for (int i = 0; i < batch.num_rows(); ++i) { @@ -407,7 +408,7 @@ Status NewPartitionedAggregationNode::GetNextInternal(RuntimeState* state, SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT)); RETURN_IF_CANCELLED(state); - RETURN_IF_ERROR(state->check_query_state()); + RETURN_IF_ERROR(state->check_query_state("New partitioned aggregation, while getting next.")); // clear tmp expr result alocations expr_results_pool_->clear(); @@ -486,7 +487,8 @@ Status NewPartitionedAggregationNode::GetRowsFromPartition(RuntimeState* state, // maintenance every N iterations. if ((count++ & (N - 1)) == 0) { RETURN_IF_CANCELLED(state); - RETURN_IF_ERROR(QueryMaintenance(state)); + RETURN_IF_ERROR(state->check_query_state( + "New partitioned aggregation, while getting rows from partition.")); } int row_idx = row_batch->add_row(); @@ -527,7 +529,8 @@ Status NewPartitionedAggregationNode::GetRowsStreaming(RuntimeState* state, do { DCHECK_EQ(out_batch->num_rows(), 0); RETURN_IF_CANCELLED(state); - RETURN_IF_ERROR(QueryMaintenance(state)); + RETURN_IF_ERROR(state->check_query_state( + "New partitioned aggregation, while getting rows in streaming.")); RETURN_IF_ERROR(child(0)->get_next(state, child_batch_.get(), &child_eos_)); SCOPED_TIMER(streaming_timer_); @@ -834,7 +837,11 @@ Status NewPartitionedAggregationNode::Partition::Spill(bool more_aggregate_rows) DCHECK(!parent->is_streaming_preagg_); DCHECK(!is_closed); DCHECK(!is_spilled()); - RETURN_IF_ERROR(parent->state_->StartSpilling(parent->mem_tracker())); + // TODO(ml): enable spill + std::stringstream msg; + msg << "New partitioned Aggregation in spill"; + LIMIT_EXCEEDED(parent->mem_tracker(), parent->state_, msg.str()); + // RETURN_IF_ERROR(parent->state_->StartSpilling(parent->mem_tracker())); RETURN_IF_ERROR(SerializeStreamForSpilling()); @@ -1345,7 +1352,7 @@ Status NewPartitionedAggregationNode::ProcessStream(BufferedTupleStream3* input_ RETURN_IF_ERROR(input_stream->GetNext(&batch, &eos)); RETURN_IF_ERROR( ProcessBatch(&batch, ht_ctx_.get())); - RETURN_IF_ERROR(state_->check_query_state()); + RETURN_IF_ERROR(state_->check_query_state("New partitioned aggregation, while processing stream.")); batch.reset(); } while (!eos); } diff --git a/be/src/exec/new_partitioned_hash_table.cc b/be/src/exec/new_partitioned_hash_table.cc index c1234d61646979..7cf735d94aa1ec 100644 --- a/be/src/exec/new_partitioned_hash_table.cc +++ b/be/src/exec/new_partitioned_hash_table.cc @@ -23,6 +23,7 @@ #include "codegen/codegen_anyval.h" #include "codegen/llvm_codegen.h" +#include "exec/exec_node.h" #include "exprs/expr.h" #include "exprs/expr_context.h" #include "exprs/slot_ref.h" @@ -118,7 +119,7 @@ Status NewPartitionedHashTableCtx::Init(ObjectPool* pool, RuntimeState* state, i int scratch_row_size = sizeof(Tuple*) * num_build_tuples; scratch_row_ = reinterpret_cast(malloc(scratch_row_size)); if (UNLIKELY(scratch_row_ == NULL)) { - return Status::InternalError(Substitute("Failed to allocate $0 bytes for scratch row of " + return Status::InternalError(Substitute("Failed to allocate $0 bytes for scratch row of " "NewPartitionedHashTableCtx.", scratch_row_size)); } diff --git a/be/src/exec/partitioned_aggregation_node.cc b/be/src/exec/partitioned_aggregation_node.cc index fbd4638f7b6a38..a9bc315f64584f 100644 --- a/be/src/exec/partitioned_aggregation_node.cc +++ b/be/src/exec/partitioned_aggregation_node.cc @@ -232,8 +232,7 @@ Status PartitionedAggregationNode::open(RuntimeState* state) { bool eos = false; do { RETURN_IF_CANCELLED(state); - // RETURN_IF_ERROR(QueryMaintenance(state)); - RETURN_IF_ERROR(state->check_query_state()); + RETURN_IF_ERROR(state->check_query_state("Partitioned aggregation, while getting next from child 0.")); RETURN_IF_ERROR(_children[0]->get_next(state, &batch, &eos)); if (UNLIKELY(VLOG_ROW_IS_ON)) { @@ -277,8 +276,7 @@ Status PartitionedAggregationNode::get_next(RuntimeState* state, RowBatch* row_b SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT)); RETURN_IF_CANCELLED(state); - // RETURN_IF_ERROR(QueryMaintenance(state)); - RETURN_IF_ERROR(state->check_query_state()); + RETURN_IF_ERROR(state->check_query_state("Partitioned aggregation, before evaluating conjuncts.")); if (reached_limit()) { *eos = true; @@ -335,8 +333,7 @@ Status PartitionedAggregationNode::get_next(RuntimeState* state, RowBatch* row_b // maintenance every N iterations. if ((count++ & (N - 1)) == 0) { RETURN_IF_CANCELLED(state); - // RETURN_IF_ERROR(QueryMaintenance(state)); - RETURN_IF_ERROR(state->check_query_state()); + RETURN_IF_ERROR(state->check_query_state("Partitioned aggregation, while evaluating conjuncts.")); } int row_idx = row_batch->add_row(); diff --git a/be/src/exec/sort_node.cpp b/be/src/exec/sort_node.cpp index 13bbc06038990f..da1d9a337ddc1d 100644 --- a/be/src/exec/sort_node.cpp +++ b/be/src/exec/sort_node.cpp @@ -146,8 +146,7 @@ Status SortNode::sort_input(RuntimeState* state) { RETURN_IF_ERROR(child(0)->get_next(state, &batch, &eos)); RETURN_IF_ERROR(_sorter->add_batch(&batch)); RETURN_IF_CANCELLED(state); - RETURN_IF_LIMIT_EXCEEDED(state, - "Sort node was getting next from the child."); + RETURN_IF_LIMIT_EXCEEDED(state, "Sort, while getting next from the child."); // RETURN_IF_ERROR(QueryMaintenance(state)); } while (!eos); RETURN_IF_ERROR(_sorter->input_done()); diff --git a/be/src/exec/spill_sort_node.cc b/be/src/exec/spill_sort_node.cc index e2bb07ca9ff2fa..907c463cd1d938 100644 --- a/be/src/exec/spill_sort_node.cc +++ b/be/src/exec/spill_sort_node.cc @@ -56,8 +56,7 @@ Status SpillSortNode::open(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::open(state)); RETURN_IF_ERROR(_sort_exec_exprs.open(state)); RETURN_IF_CANCELLED(state); - // RETURN_IF_ERROR(QueryMaintenance(state)); - RETURN_IF_ERROR(state->check_query_state()); + RETURN_IF_ERROR(state->check_query_state("Spill sort, while open.")); RETURN_IF_ERROR(child(0)->open(state)); // These objects must be created after opening the _sort_exec_exprs. Avoid creating @@ -88,8 +87,7 @@ Status SpillSortNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* e // RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT, state)); RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT)); RETURN_IF_CANCELLED(state); - // RETURN_IF_ERROR(QueryMaintenance(state)); - RETURN_IF_ERROR(state->check_query_state()); + RETURN_IF_ERROR(state->check_query_state("Spill sort, while getting next.")); if (reached_limit()) { *eos = true; @@ -166,8 +164,7 @@ Status SpillSortNode::sort_input(RuntimeState* state) { RETURN_IF_ERROR(child(0)->get_next(state, &batch, &eos)); RETURN_IF_ERROR(_sorter->add_batch(&batch)); RETURN_IF_CANCELLED(state); - // RETURN_IF_ERROR(QueryMaintenance(state)); - RETURN_IF_ERROR(state->check_query_state()); + RETURN_IF_ERROR(state->check_query_state("Spill sort, while sorting input.")); } while (!eos); RETURN_IF_ERROR(_sorter->input_done()); diff --git a/be/src/exec/topn_node.cpp b/be/src/exec/topn_node.cpp index 0d6ca6ddcf628a..e1820b299c223d 100644 --- a/be/src/exec/topn_node.cpp +++ b/be/src/exec/topn_node.cpp @@ -87,8 +87,7 @@ Status TopNNode::open(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::open(state)); RETURN_IF_CANCELLED(state); - // RETURN_IF_ERROR(QueryMaintenance(state)); - RETURN_IF_ERROR(state->check_query_state()); + RETURN_IF_ERROR(state->check_query_state("Top n, before open.")); RETURN_IF_ERROR(_sort_exec_exprs.open(state)); // Avoid creating them after every Reset()/Open(). @@ -122,8 +121,7 @@ Status TopNNode::open(RuntimeState* state) { insert_tuple_row(batch.get_row(i)); } RETURN_IF_CANCELLED(state); - // RETURN_IF_LIMIT_EXCEEDED(state); - RETURN_IF_ERROR(state->check_query_state()); + RETURN_IF_ERROR(state->check_query_state("Top n, while getting next from child 0.")); } while (!eos); } @@ -142,8 +140,7 @@ Status TopNNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT)); RETURN_IF_CANCELLED(state); - // RETURN_IF_ERROR(QueryMaintenance(state)); - RETURN_IF_ERROR(state->check_query_state()); + RETURN_IF_ERROR(state->check_query_state("Top n, before moving result to row_batch.")); while (!row_batch->at_capacity() && (_get_next_iter != _sorted_top_n.end())) { if (_num_rows_skipped < _offset) { diff --git a/be/src/runtime/buffered_block_mgr.cpp b/be/src/runtime/buffered_block_mgr.cpp index ff8acc30040188..1615792e68a63d 100644 --- a/be/src/runtime/buffered_block_mgr.cpp +++ b/be/src/runtime/buffered_block_mgr.cpp @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +#include "exec/exec_node.h" #include "runtime/buffered_block_mgr.h" #include "runtime/runtime_state.h" #include "runtime/mem_pool.h" @@ -76,9 +77,7 @@ Status BufferedBlockMgr::get_new_block(Block** block, int64_t len) { new_block->_buffer_desc->block = new_block; *block = new_block; - if (UNLIKELY(_state->instance_mem_tracker()->any_limit_exceeded())) { - return Status::MemoryLimitExceeded("Memory limit exceeded"); - } + RETURN_IF_LIMIT_EXCEEDED(_state, "Buffered block mgr, while getting new block"); return Status::OK(); } diff --git a/be/src/runtime/mem_tracker.cpp b/be/src/runtime/mem_tracker.cpp index b9a8f506930c8d..6a650d33730946 100644 --- a/be/src/runtime/mem_tracker.cpp +++ b/be/src/runtime/mem_tracker.cpp @@ -24,6 +24,7 @@ //#include //include +#include "exec/exec_node.h" #include "gutil/strings/substitute.h" #include "runtime/exec_env.h" #include "runtime/runtime_state.h" @@ -320,9 +321,7 @@ Status MemTracker::MemLimitExceeded(RuntimeState* state, const std::string& deta // } // ss << tracker_to_log->LogUsage(); // Status status = Status::MemLimitExceeded(ss.str()); - Status status = Status::MemoryLimitExceeded("Memory limit exceeded"); - if (state != nullptr) state->log_error(status.get_error_msg()); - return status; + LIMIT_EXCEEDED(this, state, ss.str()); } void MemTracker::AddGcFunction(GcFunction f) { diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index a9f805a2a17946..729542e898715a 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -514,7 +514,7 @@ void PlanFragmentExecutor::update_status(const Status& status) { if (_status.ok()) { if (status.is_mem_limit_exceeded()) { - _runtime_state->set_mem_limit_exceeded(); + _runtime_state->set_mem_limit_exceeded(status.get_error_msg()); } _status = status; } diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 35b96dd5f40540..1787920ae3916f 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -25,6 +25,7 @@ #include "codegen/llvm_codegen.h" #include "common/object_pool.h" #include "common/status.h" +#include "exec/exec_node.h" #include "exprs/expr.h" #include "exprs/timezone_db.h" #include "runtime/buffered_block_mgr.h" @@ -396,12 +397,10 @@ Status RuntimeState::set_mem_limit_exceeded( return _process_status; } -Status RuntimeState::check_query_state() { +Status RuntimeState::check_query_state(const std::string& msg) { // TODO: it would be nice if this also checked for cancellation, but doing so breaks // cases where we use Status::Cancelled("Cancelled") to indicate that the limit was reached. - if (_instance_mem_tracker->any_limit_exceeded()) { - return set_mem_limit_exceeded(); - } + RETURN_IF_LIMIT_EXCEEDED(this, msg); return query_status(); } diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 006996acebd45d..e771c86c963b07 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -334,7 +334,7 @@ class RuntimeState { // Returns a non-OK status if query execution should stop (e.g., the query was cancelled // or a mem limit was exceeded). Exec nodes should check this periodically so execution // doesn't continue if the query terminates abnormally. - Status check_query_state(); + Status check_query_state(const std::string& msg); std::vector& output_files() { return _output_files;