Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions be/src/common/atomic.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,12 @@ class AtomicPtr {
/// Atomic store with "release" memory-ordering semantic.
inline void store(T* val) { _ptr.store(reinterpret_cast<intptr_t>(val)); }

/// Store 'new_val' and return the previous value. Implies a Release memory barrier
/// (i.e. the same as Store()).
inline T* swap(T* val) {
return reinterpret_cast<T*>(_ptr.swap(reinterpret_cast<intptr_t>(val)));
}

private:
AtomicInt<intptr_t> _ptr;
};
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,8 @@ namespace config {
// Whether to continue to start be when load tablet from header failed.
CONF_Bool(ignore_rowset_stale_unconsistent_delete, "false");

// Soft memory limit as a fraction of hard memory limit.
CONF_Double(soft_mem_limit_frac, "0.9");
} // namespace config

} // namespace doris
Expand Down
6 changes: 6 additions & 0 deletions be/src/common/logging.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,10 @@
#define VLOG_ROW_IS_ON VLOG_IS_ON(3)
#define VLOG_PROGRESS_IS_ON VLOG_IS_ON(2)

/// Define a wrapper around DCHECK for strongly typed enums that print a useful error
/// message on failure.
#define DCHECK_ENUM_EQ(a, b) \
DCHECK(a == b) << "[ " #a " = " << static_cast<int>(a) << " , " #b " = " \
<< static_cast<int>(b) << " ]"

#endif
8 changes: 4 additions & 4 deletions be/src/exec/aggregation_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ Status AggregationNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(Expr::prepare(
_build_expr_ctxs, state, build_row_desc, expr_mem_tracker()));

_tuple_pool.reset(new MemPool(mem_tracker()));
_tuple_pool.reset(new MemPool(mem_tracker().get()));

_agg_fn_ctxs.resize(_aggregate_evaluators.size());
int j = _probe_expr_ctxs.size();
Expand All @@ -128,8 +128,8 @@ Status AggregationNode::prepare(RuntimeState* state) {
SlotDescriptor* intermediate_slot_desc = _intermediate_tuple_desc->slots()[j];
SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[j];
RETURN_IF_ERROR(_aggregate_evaluators[i]->prepare(
state, child(0)->row_desc(), _tuple_pool.get(),
intermediate_slot_desc, output_slot_desc, mem_tracker(), &_agg_fn_ctxs[i]));
state, child(0)->row_desc(), _tuple_pool.get(), intermediate_slot_desc,
output_slot_desc, mem_tracker(), &_agg_fn_ctxs[i]));
state->obj_pool()->add(_agg_fn_ctxs[i]);
}

Expand Down Expand Up @@ -160,7 +160,7 @@ Status AggregationNode::open(RuntimeState* state) {

RETURN_IF_ERROR(_children[0]->open(state));

RowBatch batch(_children[0]->row_desc(), state->batch_size(), mem_tracker());
RowBatch batch(_children[0]->row_desc(), state->batch_size(), mem_tracker().get());
int64_t num_input_rows = 0;
int64_t num_agg_rows = 0;

Expand Down
22 changes: 11 additions & 11 deletions be/src/exec/analytic_eval_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,18 +146,18 @@ Status AnalyticEvalNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::prepare(state));
DCHECK(child(0)->row_desc().is_prefix_of(row_desc()));
_child_tuple_desc = child(0)->row_desc().tuple_descriptors()[0];
_curr_tuple_pool.reset(new MemPool(mem_tracker()));
_prev_tuple_pool.reset(new MemPool(mem_tracker()));
_mem_pool.reset(new MemPool(mem_tracker()));
_curr_tuple_pool.reset(new MemPool(mem_tracker().get()));
_prev_tuple_pool.reset(new MemPool(mem_tracker().get()));
_mem_pool.reset(new MemPool(mem_tracker().get()));

_evaluation_timer = ADD_TIMER(runtime_profile(), "EvaluationTime");
DCHECK_EQ(_result_tuple_desc->slots().size(), _evaluators.size());

for (int i = 0; i < _evaluators.size(); ++i) {
doris_udf::FunctionContext* ctx;
RETURN_IF_ERROR(_evaluators[i]->prepare(state, child(0)->row_desc(), _mem_pool.get(),
_intermediate_tuple_desc->slots()[i], _result_tuple_desc->slots()[i],
mem_tracker(), &ctx));
RETURN_IF_ERROR(_evaluators[i]->prepare(
state, child(0)->row_desc(), _mem_pool.get(), _intermediate_tuple_desc->slots()[i],
_result_tuple_desc->slots()[i], mem_tracker(), &ctx));
_fn_ctxs.push_back(ctx);
state->obj_pool()->add(ctx);
}
Expand All @@ -171,13 +171,13 @@ Status AnalyticEvalNode::prepare(RuntimeState* state) {

if (_partition_by_eq_expr_ctx != NULL) {
RETURN_IF_ERROR(
_partition_by_eq_expr_ctx->prepare(state, cmp_row_desc, expr_mem_tracker()));
_partition_by_eq_expr_ctx->prepare(state, cmp_row_desc, expr_mem_tracker()));
//AddExprCtxToFree(_partition_by_eq_expr_ctx);
}

if (_order_by_eq_expr_ctx != NULL) {
RETURN_IF_ERROR(
_order_by_eq_expr_ctx->prepare(state, cmp_row_desc, expr_mem_tracker()));
_order_by_eq_expr_ctx->prepare(state, cmp_row_desc, expr_mem_tracker()));
//AddExprCtxToFree(_order_by_eq_expr_ctx);
}
}
Expand Down Expand Up @@ -238,8 +238,8 @@ Status AnalyticEvalNode::open(RuntimeState* state) {

// Fetch the first input batch so that some _prev_input_row can be set here to avoid
// special casing in GetNext().
_prev_child_batch.reset(new RowBatch(child(0)->row_desc(), state->batch_size(), mem_tracker()));
_curr_child_batch.reset(new RowBatch(child(0)->row_desc(), state->batch_size(), mem_tracker()));
_prev_child_batch.reset(new RowBatch(child(0)->row_desc(), state->batch_size(), mem_tracker().get()));
_curr_child_batch.reset(new RowBatch(child(0)->row_desc(), state->batch_size(), mem_tracker().get()));

while (!_input_eos && _prev_input_row == NULL) {
RETURN_IF_ERROR(child(0)->get_next(state, _curr_child_batch.get(), &_input_eos));
Expand Down Expand Up @@ -744,7 +744,7 @@ Status AnalyticEvalNode::get_next_output_batch(RuntimeState* state, RowBatch* ou
ExprContext** ctxs = &_conjunct_ctxs[0];
int num_ctxs = _conjunct_ctxs.size();

RowBatch input_batch(child(0)->row_desc(), output_batch->capacity(), mem_tracker());
RowBatch input_batch(child(0)->row_desc(), output_batch->capacity(), mem_tracker().get());
int64_t stream_idx = _input_stream->rows_returned();
RETURN_IF_ERROR(_input_stream->get_next(&input_batch, eos));

Expand Down
32 changes: 17 additions & 15 deletions be/src/exec/base_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,25 @@

namespace doris {

BaseScanner::BaseScanner(RuntimeState* state, RuntimeProfile* profile, const TBrokerScanRangeParams& params, ScannerCounter* counter) :
_state(state), _params(params), _counter(counter),
_src_tuple(nullptr),
_src_tuple_row(nullptr),
BaseScanner::BaseScanner(RuntimeState* state, RuntimeProfile* profile,
const TBrokerScanRangeParams& params, ScannerCounter* counter)
: _state(state),
_params(params),
_counter(counter),
_src_tuple(nullptr),
_src_tuple_row(nullptr),
#if BE_TEST
_mem_tracker(new MemTracker()),
_mem_pool(_mem_tracker.get()),
_mem_tracker(new MemTracker()),
#else
_mem_tracker(new MemTracker(-1, "Broker Scanner", state->instance_mem_tracker())),
_mem_pool(_state->instance_mem_tracker()),
_mem_tracker(MemTracker::CreateTracker(-1, "Broker Scanner", state->instance_mem_tracker())),
#endif
_dest_tuple_desc(nullptr),
_strict_mode(false),
_profile(profile),
_rows_read_counter(nullptr),
_read_timer(nullptr),
_materialize_timer(nullptr) {
_mem_pool(_mem_tracker.get()),
_dest_tuple_desc(nullptr),
_strict_mode(false),
_profile(profile),
_rows_read_counter(nullptr),
_read_timer(nullptr),
_materialize_timer(nullptr) {
}

Status BaseScanner::open() {
Expand Down Expand Up @@ -113,7 +115,7 @@ Status BaseScanner::init_expr_ctxes() {
}
ExprContext* ctx = nullptr;
RETURN_IF_ERROR(Expr::create_expr_tree(_state->obj_pool(), it->second, &ctx));
RETURN_IF_ERROR(ctx->prepare(_state, *_row_desc.get(), _mem_tracker.get()));
RETURN_IF_ERROR(ctx->prepare(_state, *_row_desc.get(), _mem_tracker));
RETURN_IF_ERROR(ctx->open(_state));
_dest_expr_ctx.emplace_back(ctx);
if (has_slot_id_map) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/base_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class BaseScanner {
Tuple* _src_tuple;
TupleRow* _src_tuple_row;

std::unique_ptr<MemTracker> _mem_tracker;
std::shared_ptr<MemTracker> _mem_tracker;
// Mem pool used to allocate _src_tuple and _src_tuple_row
MemPool _mem_pool;

Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/blocking_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ Status BlockingJoinNode::prepare(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::prepare(state));

_build_pool.reset(new MemPool(mem_tracker()));
_build_pool.reset(new MemPool(mem_tracker().get()));
_build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
_left_child_timer = ADD_TIMER(runtime_profile(), "LeftChildTime");
_build_row_counter = ADD_COUNTER(runtime_profile(), "BuildRows", TUnit::UNIT);
Expand All @@ -74,7 +74,7 @@ Status BlockingJoinNode::prepare(RuntimeState* state) {
_probe_tuple_row_size = num_left_tuples * sizeof(Tuple*);
_build_tuple_row_size = num_build_tuples * sizeof(Tuple*);

_left_batch.reset(new RowBatch(child(0)->row_desc(), state->batch_size(), mem_tracker()));
_left_batch.reset(new RowBatch(child(0)->row_desc(), state->batch_size(), mem_tracker().get()));
return Status::OK();
}

Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/broker_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ Status BrokerScanNode::scanner_scan(
while (!scanner_eof) {
// Fill one row batch
std::shared_ptr<RowBatch> row_batch(
new RowBatch(row_desc(), _runtime_state->batch_size(), mem_tracker()));
new RowBatch(row_desc(), _runtime_state->batch_size(), mem_tracker().get()));

// create new tuple buffer for row_batch
MemPool* tuple_pool = row_batch->tuple_data_pool();
Expand Down Expand Up @@ -382,7 +382,7 @@ Status BrokerScanNode::scanner_scan(
// 1. too many batches in queue, or
// 2. at least one batch in queue and memory exceed limit.
(_batch_queue.size() >= _max_buffered_batches
|| (mem_tracker()->any_limit_exceeded() && !_batch_queue.empty()))) {
|| (mem_tracker()->AnyLimitExceeded(MemLimit::HARD) && !_batch_queue.empty()))) {
_queue_writer_cond.wait_for(l, std::chrono::seconds(1));
}
// Process already set failed, so we just return OK
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/cross_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ Status CrossJoinNode::construct_build_side(RuntimeState* state) {

while (true) {
RowBatch* batch = _build_batch_pool->add(
new RowBatch(child(1)->row_desc(), state->batch_size(), mem_tracker()));
new RowBatch(child(1)->row_desc(), state->batch_size(), mem_tracker().get()));

RETURN_IF_CANCELLED(state);
// TODO(zhaochun):
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/csv_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ Status CsvScanNode::prepare(RuntimeState* state) {
return Status::InternalError("new a csv scanner failed.");
}

_tuple_pool.reset(new(std::nothrow) MemPool(state->instance_mem_tracker()));
_tuple_pool.reset(new(std::nothrow) MemPool(state->instance_mem_tracker().get()));
if (_tuple_pool.get() == nullptr) {
return Status::InternalError("new a mem pool failed.");
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/data_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ Status DataSink::init(const TDataSink& thrift_sink) {
}

Status DataSink::prepare(RuntimeState* state) {
_expr_mem_tracker.reset(new MemTracker(-1, "Data sink", state->instance_mem_tracker()));
_expr_mem_tracker = MemTracker::CreateTracker(-1, std::string("DataSink:") + std::to_string(state->load_job_id()),
state->instance_mem_tracker());
return Status::OK();
}

Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/data_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class DataSink {
// It must be okay to call this multiple times. Subsequent calls should
// be ignored.
virtual Status close(RuntimeState* state, Status exec_status) {
_expr_mem_tracker->close();
_expr_mem_tracker.reset();
_closed = true;
return Status::OK();
}
Expand All @@ -86,7 +86,7 @@ class DataSink {
// Set to true after close() has been called. subclasses should check and set this in
// close().
bool _closed;
std::unique_ptr<MemTracker> _expr_mem_tracker;
std::shared_ptr<MemTracker> _expr_mem_tracker;

// Maybe this will be transferred to BufferControlBlock.
std::shared_ptr<QueryStatistics> _query_statistics;
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/es_http_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ Status EsHttpScanNode::scanner_scan(
while (!scanner_eof) {
// Fill one row batch
std::shared_ptr<RowBatch> row_batch(
new RowBatch(row_desc(), _runtime_state->batch_size(), mem_tracker()));
new RowBatch(row_desc(), _runtime_state->batch_size(), mem_tracker().get()));

// create new tuple buffer for row_batch
MemPool* tuple_pool = row_batch->tuple_data_pool();
Expand Down
54 changes: 25 additions & 29 deletions be/src/exec/es_http_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,37 +30,33 @@

namespace doris {

EsHttpScanner::EsHttpScanner(
RuntimeState* state,
RuntimeProfile* profile,
TupleId tuple_id,
const std::map<std::string, std::string>& properties,
const std::vector<ExprContext*>& conjunct_ctxs,
EsScanCounter* counter,
bool doc_value_mode) :
_state(state),
_profile(profile),
_tuple_id(tuple_id),
_properties(properties),
_conjunct_ctxs(conjunct_ctxs),
_next_range(0),
_line_eof(false),
_batch_eof(false),
EsHttpScanner::EsHttpScanner(RuntimeState* state, RuntimeProfile* profile, TupleId tuple_id,
const std::map<std::string, std::string>& properties,
const std::vector<ExprContext*>& conjunct_ctxs, EsScanCounter* counter,
bool doc_value_mode)
: _state(state),
_profile(profile),
_tuple_id(tuple_id),
_properties(properties),
_conjunct_ctxs(conjunct_ctxs),
_next_range(0),
_line_eof(false),
_batch_eof(false),
#if BE_TEST
_mem_tracker(new MemTracker()),
_mem_pool(_mem_tracker.get()),
#else
_mem_tracker(new MemTracker(-1, "EsHttp Scanner", state->instance_mem_tracker())),
_mem_pool(_state->instance_mem_tracker()),
_mem_tracker(new MemTracker()),
#else
_mem_tracker(
MemTracker::CreateTracker(-1, "EsHttp Scanner", state->instance_mem_tracker())),
#endif
_tuple_desc(nullptr),
_counter(counter),
_es_reader(nullptr),
_es_scroll_parser(nullptr),
_doc_value_mode(doc_value_mode),
_rows_read_counter(nullptr),
_read_timer(nullptr),
_materialize_timer(nullptr) {
_mem_pool(_mem_tracker.get()),
_tuple_desc(nullptr),
_counter(counter),
_es_reader(nullptr),
_es_scroll_parser(nullptr),
_doc_value_mode(doc_value_mode),
_rows_read_counter(nullptr),
_read_timer(nullptr),
_materialize_timer(nullptr) {
}

EsHttpScanner::~EsHttpScanner() {
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/es_http_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class EsHttpScanner {
std::vector<SlotDescriptor*> _slot_descs;
std::unique_ptr<RowDescriptor> _row_desc;

std::unique_ptr<MemTracker> _mem_tracker;
std::shared_ptr<MemTracker> _mem_tracker;
MemPool _mem_pool;

const TupleDescriptor* _tuple_desc;
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/except_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ Status ExceptNode::open(RuntimeState* state) {
temp_tbl->close();
}
// probe
_probe_batch.reset(new RowBatch(child(i)->row_desc(), state->batch_size(), mem_tracker()));
_probe_batch.reset(new RowBatch(child(i)->row_desc(), state->batch_size(), mem_tracker().get()));
ScopedTimer<MonotonicStopWatch> probe_timer(_probe_timer);
RETURN_IF_ERROR(child(i)->open(state));
eos = false;
Expand Down
Loading