Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions be/src/exec/aggregation_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ Status AggregationNode::open(RuntimeState* state) {
while (true) {
bool eos = false;
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(state->check_query_state());
RETURN_IF_ERROR(state->check_query_state("Aggregation, before getting next from child 0."));
RETURN_IF_ERROR(_children[0]->get_next(state, &batch, &eos));
// SCOPED_TIMER(_build_timer);
if (VLOG_ROW_IS_ON) {
Expand Down Expand Up @@ -227,7 +227,7 @@ Status AggregationNode::open(RuntimeState* state) {
}

// RETURN_IF_LIMIT_EXCEEDED(state);
RETURN_IF_ERROR(state->check_query_state());
RETURN_IF_ERROR(state->check_query_state("Aggregation, after hashing the child 0."));

COUNTER_SET(_hash_table_buckets_counter, _hash_tbl->num_buckets());
COUNTER_SET(memory_used_counter(),
Expand All @@ -238,7 +238,7 @@ Status AggregationNode::open(RuntimeState* state) {

batch.reset();

RETURN_IF_ERROR(state->check_query_state());
RETURN_IF_ERROR(state->check_query_state("Aggregation, after setting the counter."));
if (eos) {
break;
}
Expand All @@ -262,7 +262,7 @@ Status AggregationNode::get_next(RuntimeState* state, RowBatch* row_batch, bool*
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(state->check_query_state());
RETURN_IF_ERROR(state->check_query_state("Aggregation, before evaluating conjuncts."));
SCOPED_TIMER(_get_results_timer);

if (reached_limit()) {
Expand All @@ -280,7 +280,7 @@ Status AggregationNode::get_next(RuntimeState* state, RowBatch* row_batch, bool*
// maintenance every N iterations.
if (count++ % N == 0) {
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(state->check_query_state());
RETURN_IF_ERROR(state->check_query_state("Aggregation, while evaluating conjuncts."));
}
int row_idx = row_batch->add_row();
TupleRow* row = row_batch->get_row(row_idx);
Expand Down
3 changes: 1 addition & 2 deletions be/src/exec/cross_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ Status CrossJoinNode::construct_build_side(RuntimeState* state) {
RETURN_IF_ERROR(child(1)->get_next(state, batch, &eos));

// to prevent use too many memory
RETURN_IF_LIMIT_EXCEEDED(state,
"Cross join was getting next from the child.");
RETURN_IF_LIMIT_EXCEEDED(state, "Cross join, while getting next from the child 1.");

SCOPED_TIMER(_build_timer);
_build_batches.add_row_batch(batch);
Expand Down
5 changes: 1 addition & 4 deletions be/src/exec/exchange_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,6 @@ Status ExchangeNode::get_next(RuntimeState* state, RowBatch* output_batch, bool*
{
SCOPED_TIMER(_convert_row_batch_timer);
RETURN_IF_CANCELLED(state);
// RETURN_IF_ERROR(QueryMaintenance(state));
RETURN_IF_ERROR(state->check_query_state());
// copy rows until we hit the limit/capacity or until we exhaust _input_batch
while (!reached_limit() && !output_batch->at_capacity()
&& _input_batch != NULL && _next_row_idx < _input_batch->capacity()) {
Expand Down Expand Up @@ -211,8 +209,7 @@ Status ExchangeNode::get_next(RuntimeState* state, RowBatch* output_batch, bool*
Status ExchangeNode::get_next_merging(RuntimeState* state, RowBatch* output_batch, bool* eos) {
DCHECK_EQ(output_batch->num_rows(), 0);
RETURN_IF_CANCELLED(state);
// RETURN_IF_ERROR(QueryMaintenance(state));
RETURN_IF_ERROR(state->check_query_state());
RETURN_IF_ERROR(state->check_query_state("Exchange, while merging next."));

RETURN_IF_ERROR(_stream_recvr->get_next(output_batch, eos));
while ((_num_rows_skipped < _offset)) {
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/exec_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -724,10 +724,10 @@ Status ExecNode::enable_deny_reservation_debug_action() {
}
*/

Status ExecNode::QueryMaintenance(RuntimeState* state) {
Status ExecNode::QueryMaintenance(RuntimeState* state, const std::string& msg) {
// TODO chenhao , when introduce latest AnalyticEvalNode open it
// ScalarExprEvaluator::FreeLocalAllocations(evals_to_free_);
return state->check_query_state();
return state->check_query_state(msg);
}

}
20 changes: 14 additions & 6 deletions be/src/exec/exec_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "util/blocking_queue.hpp"
#include "runtime/bufferpool/buffer_pool.h"
#include "runtime/query_statistics.h"
#include "service/backend_options.h"

namespace llvm {
class Function;
Expand Down Expand Up @@ -387,22 +388,29 @@ class ExecNode {
/// Nodes may override this to add extra periodic cleanup, e.g. freeing other local
/// allocations. ExecNodes overriding this function should return
/// ExecNode::QueryMaintenance().
virtual Status QueryMaintenance(RuntimeState* state) WARN_UNUSED_RESULT;
virtual Status QueryMaintenance(RuntimeState* state, const std::string& msg) WARN_UNUSED_RESULT;

private:
bool _is_closed;
};

#define LIMIT_EXCEEDED(tracker, state, msg) \
do { \
stringstream str; \
str << "Memory exceed limit. " << msg << " "; \
str << "Backend: " << BackendOptions::get_localhost() << ", "; \
str << "fragment: " << print_id(state->fragment_instance_id()) << " "; \
str << "Used: " << tracker->consumption() << ", Limit: " << tracker->limit() << ". "; \
str << "You can change the limit by session variable exec_mem_limit."; \
return Status::MemoryLimitExceeded(str.str()); \
} while (false)

#define RETURN_IF_LIMIT_EXCEEDED(state, msg) \
do { \
/* if (UNLIKELY(MemTracker::limit_exceeded(*(state)->mem_trackers()))) { */ \
MemTracker* tracker = state->instance_mem_tracker()->find_limit_exceeded_tracker(); \
if (tracker != nullptr) { \
stringstream str; \
str << "Memory exceed limit. " << msg << " "; \
str << "Used: " << tracker->consumption() << ", Limit: " << tracker->limit() << ". "; \
str << "You can change the limit by session variable exec_mem_limit."; \
return Status::MemoryLimitExceeded(str.str()); \
LIMIT_EXCEEDED(tracker, state, msg); \
} \
} while (false)
}
Expand Down
3 changes: 1 addition & 2 deletions be/src/exec/hash_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,7 @@ Status HashJoinNode::construct_hash_table(RuntimeState* state) {
SCOPED_TIMER(_build_timer);
// take ownership of tuple data of build_batch
_build_pool->acquire_data(build_batch.tuple_data_pool(), false);
RETURN_IF_LIMIT_EXCEEDED(state,
"Hash join was constructing the hash table.");
RETURN_IF_LIMIT_EXCEEDED(state, "Hash join, while constructing the hash table.");

// Call codegen version if possible
if (_process_build_batch_fn == NULL) {
Expand Down
19 changes: 13 additions & 6 deletions be/src/exec/new_partitioned_aggregation_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,8 @@ Status NewPartitionedAggregationNode::open(RuntimeState* state) {
bool eos = false;
do {
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(QueryMaintenance(state));
RETURN_IF_ERROR(state->check_query_state(
"New partitioned aggregation, while getting next from child 0."));
RETURN_IF_ERROR(_children[0]->get_next(state, &batch, &eos));
if (UNLIKELY(VLOG_ROW_IS_ON)) {
for (int i = 0; i < batch.num_rows(); ++i) {
Expand Down Expand Up @@ -407,7 +408,7 @@ Status NewPartitionedAggregationNode::GetNextInternal(RuntimeState* state,
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(state->check_query_state());
RETURN_IF_ERROR(state->check_query_state("New partitioned aggregation, while getting next."));
// clear tmp expr result alocations
expr_results_pool_->clear();

Expand Down Expand Up @@ -486,7 +487,8 @@ Status NewPartitionedAggregationNode::GetRowsFromPartition(RuntimeState* state,
// maintenance every N iterations.
if ((count++ & (N - 1)) == 0) {
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(QueryMaintenance(state));
RETURN_IF_ERROR(state->check_query_state(
"New partitioned aggregation, while getting rows from partition."));
}

int row_idx = row_batch->add_row();
Expand Down Expand Up @@ -527,7 +529,8 @@ Status NewPartitionedAggregationNode::GetRowsStreaming(RuntimeState* state,
do {
DCHECK_EQ(out_batch->num_rows(), 0);
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(QueryMaintenance(state));
RETURN_IF_ERROR(state->check_query_state(
"New partitioned aggregation, while getting rows in streaming."));

RETURN_IF_ERROR(child(0)->get_next(state, child_batch_.get(), &child_eos_));
SCOPED_TIMER(streaming_timer_);
Expand Down Expand Up @@ -834,7 +837,11 @@ Status NewPartitionedAggregationNode::Partition::Spill(bool more_aggregate_rows)
DCHECK(!parent->is_streaming_preagg_);
DCHECK(!is_closed);
DCHECK(!is_spilled());
RETURN_IF_ERROR(parent->state_->StartSpilling(parent->mem_tracker()));
// TODO(ml): enable spill
std::stringstream msg;
msg << "New partitioned Aggregation in spill";
LIMIT_EXCEEDED(parent->mem_tracker(), parent->state_, msg.str());
// RETURN_IF_ERROR(parent->state_->StartSpilling(parent->mem_tracker()));

RETURN_IF_ERROR(SerializeStreamForSpilling());

Expand Down Expand Up @@ -1345,7 +1352,7 @@ Status NewPartitionedAggregationNode::ProcessStream(BufferedTupleStream3* input_
RETURN_IF_ERROR(input_stream->GetNext(&batch, &eos));
RETURN_IF_ERROR(
ProcessBatch<AGGREGATED_ROWS>(&batch, ht_ctx_.get()));
RETURN_IF_ERROR(state_->check_query_state());
RETURN_IF_ERROR(state_->check_query_state("New partitioned aggregation, while processing stream."));
batch.reset();
} while (!eos);
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/new_partitioned_hash_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include "codegen/codegen_anyval.h"
#include "codegen/llvm_codegen.h"
#include "exec/exec_node.h"
#include "exprs/expr.h"
#include "exprs/expr_context.h"
#include "exprs/slot_ref.h"
Expand Down Expand Up @@ -118,7 +119,7 @@ Status NewPartitionedHashTableCtx::Init(ObjectPool* pool, RuntimeState* state, i
int scratch_row_size = sizeof(Tuple*) * num_build_tuples;
scratch_row_ = reinterpret_cast<TupleRow*>(malloc(scratch_row_size));
if (UNLIKELY(scratch_row_ == NULL)) {
return Status::InternalError(Substitute("Failed to allocate $0 bytes for scratch row of "
return Status::InternalError(Substitute("Failed to allocate $0 bytes for scratch row of "
"NewPartitionedHashTableCtx.", scratch_row_size));
}

Expand Down
9 changes: 3 additions & 6 deletions be/src/exec/partitioned_aggregation_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,7 @@ Status PartitionedAggregationNode::open(RuntimeState* state) {
bool eos = false;
do {
RETURN_IF_CANCELLED(state);
// RETURN_IF_ERROR(QueryMaintenance(state));
RETURN_IF_ERROR(state->check_query_state());
RETURN_IF_ERROR(state->check_query_state("Partitioned aggregation, while getting next from child 0."));
RETURN_IF_ERROR(_children[0]->get_next(state, &batch, &eos));

if (UNLIKELY(VLOG_ROW_IS_ON)) {
Expand Down Expand Up @@ -277,8 +276,7 @@ Status PartitionedAggregationNode::get_next(RuntimeState* state, RowBatch* row_b
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
RETURN_IF_CANCELLED(state);
// RETURN_IF_ERROR(QueryMaintenance(state));
RETURN_IF_ERROR(state->check_query_state());
RETURN_IF_ERROR(state->check_query_state("Partitioned aggregation, before evaluating conjuncts."));

if (reached_limit()) {
*eos = true;
Expand Down Expand Up @@ -335,8 +333,7 @@ Status PartitionedAggregationNode::get_next(RuntimeState* state, RowBatch* row_b
// maintenance every N iterations.
if ((count++ & (N - 1)) == 0) {
RETURN_IF_CANCELLED(state);
// RETURN_IF_ERROR(QueryMaintenance(state));
RETURN_IF_ERROR(state->check_query_state());
RETURN_IF_ERROR(state->check_query_state("Partitioned aggregation, while evaluating conjuncts."));
}

int row_idx = row_batch->add_row();
Expand Down
3 changes: 1 addition & 2 deletions be/src/exec/sort_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,7 @@ Status SortNode::sort_input(RuntimeState* state) {
RETURN_IF_ERROR(child(0)->get_next(state, &batch, &eos));
RETURN_IF_ERROR(_sorter->add_batch(&batch));
RETURN_IF_CANCELLED(state);
RETURN_IF_LIMIT_EXCEEDED(state,
"Sort node was getting next from the child.");
RETURN_IF_LIMIT_EXCEEDED(state, "Sort, while getting next from the child.");
// RETURN_IF_ERROR(QueryMaintenance(state));
} while (!eos);
RETURN_IF_ERROR(_sorter->input_done());
Expand Down
9 changes: 3 additions & 6 deletions be/src/exec/spill_sort_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ Status SpillSortNode::open(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::open(state));
RETURN_IF_ERROR(_sort_exec_exprs.open(state));
RETURN_IF_CANCELLED(state);
// RETURN_IF_ERROR(QueryMaintenance(state));
RETURN_IF_ERROR(state->check_query_state());
RETURN_IF_ERROR(state->check_query_state("Spill sort, while open."));
RETURN_IF_ERROR(child(0)->open(state));

// These objects must be created after opening the _sort_exec_exprs. Avoid creating
Expand Down Expand Up @@ -88,8 +87,7 @@ Status SpillSortNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* e
// RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT, state));
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
RETURN_IF_CANCELLED(state);
// RETURN_IF_ERROR(QueryMaintenance(state));
RETURN_IF_ERROR(state->check_query_state());
RETURN_IF_ERROR(state->check_query_state("Spill sort, while getting next."));

if (reached_limit()) {
*eos = true;
Expand Down Expand Up @@ -166,8 +164,7 @@ Status SpillSortNode::sort_input(RuntimeState* state) {
RETURN_IF_ERROR(child(0)->get_next(state, &batch, &eos));
RETURN_IF_ERROR(_sorter->add_batch(&batch));
RETURN_IF_CANCELLED(state);
// RETURN_IF_ERROR(QueryMaintenance(state));
RETURN_IF_ERROR(state->check_query_state());
RETURN_IF_ERROR(state->check_query_state("Spill sort, while sorting input."));
} while (!eos);

RETURN_IF_ERROR(_sorter->input_done());
Expand Down
9 changes: 3 additions & 6 deletions be/src/exec/topn_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ Status TopNNode::open(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::open(state));
RETURN_IF_CANCELLED(state);
// RETURN_IF_ERROR(QueryMaintenance(state));
RETURN_IF_ERROR(state->check_query_state());
RETURN_IF_ERROR(state->check_query_state("Top n, before open."));
RETURN_IF_ERROR(_sort_exec_exprs.open(state));

// Avoid creating them after every Reset()/Open().
Expand Down Expand Up @@ -122,8 +121,7 @@ Status TopNNode::open(RuntimeState* state) {
insert_tuple_row(batch.get_row(i));
}
RETURN_IF_CANCELLED(state);
// RETURN_IF_LIMIT_EXCEEDED(state);
RETURN_IF_ERROR(state->check_query_state());
RETURN_IF_ERROR(state->check_query_state("Top n, while getting next from child 0."));
} while (!eos);
}

Expand All @@ -142,8 +140,7 @@ Status TopNNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
RETURN_IF_CANCELLED(state);
// RETURN_IF_ERROR(QueryMaintenance(state));
RETURN_IF_ERROR(state->check_query_state());
RETURN_IF_ERROR(state->check_query_state("Top n, before moving result to row_batch."));

while (!row_batch->at_capacity() && (_get_next_iter != _sorted_top_n.end())) {
if (_num_rows_skipped < _offset) {
Expand Down
5 changes: 2 additions & 3 deletions be/src/runtime/buffered_block_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

#include "exec/exec_node.h"
#include "runtime/buffered_block_mgr.h"
#include "runtime/runtime_state.h"
#include "runtime/mem_pool.h"
Expand Down Expand Up @@ -76,9 +77,7 @@ Status BufferedBlockMgr::get_new_block(Block** block, int64_t len) {
new_block->_buffer_desc->block = new_block;
*block = new_block;

if (UNLIKELY(_state->instance_mem_tracker()->any_limit_exceeded())) {
return Status::MemoryLimitExceeded("Memory limit exceeded");
}
RETURN_IF_LIMIT_EXCEEDED(_state, "Buffered block mgr, while getting new block");

return Status::OK();
}
Expand Down
5 changes: 2 additions & 3 deletions be/src/runtime/mem_tracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
//#include <boost/shared_ptr.hpp>
//include <boost/weak_ptr.hpp>

#include "exec/exec_node.h"
#include "gutil/strings/substitute.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
Expand Down Expand Up @@ -320,9 +321,7 @@ Status MemTracker::MemLimitExceeded(RuntimeState* state, const std::string& deta
// }
// ss << tracker_to_log->LogUsage();
// Status status = Status::MemLimitExceeded(ss.str());
Status status = Status::MemoryLimitExceeded("Memory limit exceeded");
if (state != nullptr) state->log_error(status.get_error_msg());
return status;
LIMIT_EXCEEDED(this, state, ss.str());
}

void MemTracker::AddGcFunction(GcFunction f) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/plan_fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ void PlanFragmentExecutor::update_status(const Status& status) {

if (_status.ok()) {
if (status.is_mem_limit_exceeded()) {
_runtime_state->set_mem_limit_exceeded();
_runtime_state->set_mem_limit_exceeded(status.get_error_msg());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
_runtime_state->set_mem_limit_exceeded(status.get_error_msg());
_runtime_state->set_mem_limit_exceeded(status.to_string());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set_mem_limit_exceeded already set the code of MemoryLimitExceeded

}
_status = status;
}
Expand Down
7 changes: 3 additions & 4 deletions be/src/runtime/runtime_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "codegen/llvm_codegen.h"
#include "common/object_pool.h"
#include "common/status.h"
#include "exec/exec_node.h"
#include "exprs/expr.h"
#include "exprs/timezone_db.h"
#include "runtime/buffered_block_mgr.h"
Expand Down Expand Up @@ -396,12 +397,10 @@ Status RuntimeState::set_mem_limit_exceeded(
return _process_status;
}

Status RuntimeState::check_query_state() {
Status RuntimeState::check_query_state(const std::string& msg) {
// TODO: it would be nice if this also checked for cancellation, but doing so breaks
// cases where we use Status::Cancelled("Cancelled") to indicate that the limit was reached.
if (_instance_mem_tracker->any_limit_exceeded()) {
return set_mem_limit_exceeded();
}
RETURN_IF_LIMIT_EXCEEDED(this, msg);
return query_status();
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ class RuntimeState {
// Returns a non-OK status if query execution should stop (e.g., the query was cancelled
// or a mem limit was exceeded). Exec nodes should check this periodically so execution
// doesn't continue if the query terminates abnormally.
Status check_query_state();
Status check_query_state(const std::string& msg);

std::vector<std::string>& output_files() {
return _output_files;
Expand Down