Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/exec/exec_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ void ExecNode::try_do_aggregate_serde_improve() {
void ExecNode::init_runtime_profile(const std::string& name) {
std::stringstream ss;
ss << name << " (id=" << _id << ")";
_runtime_profile.reset(new RuntimeProfile(_pool, ss.str()));
_runtime_profile.reset(new RuntimeProfile(ss.str()));
_runtime_profile->set_metadata(_id);
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/tablet_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ Status OlapTableSink::prepare(RuntimeState* state) {
_num_senders = state->num_per_fragment_instances();

// profile must add to state's object pool
_profile = state->obj_pool()->add(new RuntimeProfile(_pool, "OlapTableSink"));
_profile = state->obj_pool()->add(new RuntimeProfile("OlapTableSink"));
_mem_tracker = _pool->add(new MemTracker(-1, "OlapTableSink", state->instance_mem_tracker()));

SCOPED_TIMER(_profile->total_time_counter());
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/buffered_block_mgr2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1259,7 +1259,7 @@ void BufferedBlockMgr2::init(

io_mgr->register_context(&_io_request_context);

_profile.reset(new RuntimeProfile(&_obj_pool, "BlockMgr"));
_profile.reset(new RuntimeProfile("BlockMgr"));
parent_profile->add_child(_profile.get(), true, NULL);

_block_size_counter = ADD_COUNTER(_profile.get(), "MaxBlockSize", TUnit::BYTES);
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/data_spliter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ Status DataSpliter::prepare(RuntimeState* state) {
for (auto& iter : _rollup_map) {
RETURN_IF_ERROR(iter.second->prepare(state, _row_desc, _expr_mem_tracker.get()));
}
_profile = state->obj_pool()->add(new RuntimeProfile(state->obj_pool(), title.str()));
_profile = state->obj_pool()->add(new RuntimeProfile(title.str()));
for (auto iter : _partition_infos) {
RETURN_IF_ERROR(iter->prepare(state, _row_desc, _expr_mem_tracker.get()));
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/data_stream_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ Status DataStreamSender::prepare(RuntimeState* state) {
_state = state;
std::stringstream title;
title << "DataStreamSender (dst_id=" << _dest_node_id << ")";
_profile = _pool->add(new RuntimeProfile(_pool, title.str()));
_profile = _pool->add(new RuntimeProfile(title.str()));
SCOPED_TIMER(_profile->total_time_counter());
_mem_tracker.reset(
new MemTracker(_profile, -1, "DataStreamSender", state->instance_mem_tracker()));
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/dpp_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ Status Translator::create_profile(RuntimeState* state) {
std::stringstream ss;
ss << "Dpp translator(" << _tablet_desc.partition_id << "_" << _tablet_desc.bucket_id
<< "_" << _rollup_name << ")";
_profile = state->obj_pool()->add(new RuntimeProfile(state->obj_pool(), ss.str()));
_profile = state->obj_pool()->add(new RuntimeProfile(ss.str()));

_add_batch_timer = ADD_TIMER(_profile, "add batch time");
_sort_timer = ADD_TIMER(_profile, "sort time");
Expand Down Expand Up @@ -875,7 +875,7 @@ Status Translator::close(RuntimeState* state) {
}

Status DppSink::init(RuntimeState* state) {
_profile = state->obj_pool()->add(new RuntimeProfile(state->obj_pool(), "Dpp sink"));
_profile = state->obj_pool()->add(new RuntimeProfile("Dpp sink"));
return Status::OK();
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/export_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ Status ExportSink::prepare(RuntimeState* state) {
std::stringstream title;
title << "ExportSink (frag_id=" << state->fragment_instance_id() << ")";
// create profile
_profile = state->obj_pool()->add(new RuntimeProfile(state->obj_pool(), title.str()));
_profile = state->obj_pool()->add(new RuntimeProfile(title.str()));
SCOPED_TIMER(_profile->total_time_counter());

_mem_tracker.reset(new MemTracker(-1, "ExportSink", state->instance_mem_tracker()));
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/memory_scratch_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ Status MemoryScratchSink::prepare(RuntimeState* state) {
std::stringstream title;
title << "MemoryScratchSink (frag_id=" << fragment_instance_id << ")";
// create profile
_profile = state->obj_pool()->add(new RuntimeProfile(state->obj_pool(), title.str()));
_profile = state->obj_pool()->add(new RuntimeProfile(title.str()));

return Status::OK();
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/mysql_table_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ Status MysqlTableSink::prepare(RuntimeState* state) {
std::stringstream title;
title << "MysqlTableSink (frag_id=" << state->fragment_instance_id() << ")";
// create profile
_profile = state->obj_pool()->add(new RuntimeProfile(state->obj_pool(), title.str()));
_profile = state->obj_pool()->add(new RuntimeProfile(title.str()));
return Status::OK();
}

Expand Down
17 changes: 13 additions & 4 deletions be/src/runtime/plan_fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,12 @@ Status PlanFragmentExecutor::open_internal() {
{
SCOPED_TIMER(profile()->total_time_counter());
collect_query_statistics();
Status status = _sink->close(runtime_state(), _status);
Status status;
{
boost::lock_guard<boost::mutex> l(_status_lock);
status = _status;
}
status = _sink->close(runtime_state(), status);
RETURN_IF_ERROR(status);
}

Expand Down Expand Up @@ -489,8 +494,7 @@ void PlanFragmentExecutor::update_status(const Status& status) {

{
boost::lock_guard<boost::mutex> l(_status_lock);

if (_status.ok()) {
if (!_status.ok()) {
if (status.is_mem_limit_exceeded()) {
_runtime_state->set_mem_limit_exceeded(status.get_error_msg());
}
Expand Down Expand Up @@ -547,7 +551,12 @@ void PlanFragmentExecutor::close() {

if (_sink.get() != NULL) {
if (_prepared) {
_sink->close(runtime_state(), _status);
Status status;
{
boost::lock_guard<boost::mutex> l(_status_lock);
status = _status;
}
_sink->close(runtime_state(), status);
} else {
_sink->close(runtime_state(), Status::InternalError("prepare failed"));
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/result_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ Status ResultSink::prepare(RuntimeState* state) {
std::stringstream title;
title << "DataBufferSender (dst_fragment_instance_id=" << print_id(state->fragment_instance_id()) << ")";
// create profile
_profile = state->obj_pool()->add(new RuntimeProfile(state->obj_pool(), title.str()));
_profile = state->obj_pool()->add(new RuntimeProfile(title.str()));
// prepare output_expr
RETURN_IF_ERROR(prepare_exprs(state));

Expand Down
9 changes: 4 additions & 5 deletions be/src/runtime/runtime_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ RuntimeState::RuntimeState(
const TUniqueId& fragment_instance_id,
const TQueryOptions& query_options,
const TQueryGlobals& query_globals, ExecEnv* exec_env) :
_profile("Fragment " + print_id(fragment_instance_id)),
_obj_pool(new ObjectPool()),
_data_stream_recvrs_pool(new ObjectPool()),
_unreported_error_idx(0),
_profile(_obj_pool.get(), "Fragment " + print_id(fragment_instance_id)),
_fragment_mem_tracker(NULL),
_is_cancelled(false),
_per_fragment_instance_idx(0),
Expand All @@ -76,12 +76,11 @@ RuntimeState::RuntimeState(
const TExecPlanFragmentParams& fragment_params,
const TQueryOptions& query_options,
const TQueryGlobals& query_globals, ExecEnv* exec_env) :
_profile("Fragment " + print_id(fragment_params.params.fragment_instance_id)),
_obj_pool(new ObjectPool()),
_data_stream_recvrs_pool(new ObjectPool()),
_unreported_error_idx(0),
_query_id(fragment_params.params.query_id),
_profile(_obj_pool.get(),
"Fragment " + print_id(fragment_params.params.fragment_instance_id)),
_fragment_mem_tracker(NULL),
_is_cancelled(false),
_per_fragment_instance_idx(0),
Expand All @@ -100,10 +99,10 @@ RuntimeState::RuntimeState(
}

RuntimeState::RuntimeState(const TQueryGlobals& query_globals)
: _obj_pool(new ObjectPool()),
: _profile("<unnamed>"),
_obj_pool(new ObjectPool()),
_data_stream_recvrs_pool(new ObjectPool()),
_unreported_error_idx(0),
_profile(_obj_pool.get(), "<unnamed>"),
_is_cancelled(false),
_per_fragment_instance_idx(0) {
_query_options.batch_size = DEFAULT_BATCH_SIZE;
Expand Down
6 changes: 4 additions & 2 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,10 @@ class RuntimeState {

static const int DEFAULT_BATCH_SIZE = 2048;

// put runtime state before _obj_pool, so that it will be deconstructed after
// _obj_pool. Because some of object in _obj_pool will use profile when deconstructing.
RuntimeProfile _profile;

DescriptorTbl* _desc_tbl;
std::shared_ptr<ObjectPool> _obj_pool;

Expand Down Expand Up @@ -542,8 +546,6 @@ class RuntimeState {
// state is responsible for returning this pool to the thread mgr.
ThreadResourceMgr::ResourcePool* _resource_pool;

RuntimeProfile _profile;

// all mem limits that apply to this query
std::vector<MemTracker*> _mem_trackers;

Expand Down
2 changes: 1 addition & 1 deletion be/src/util/dummy_runtime_profile.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
namespace doris {
class DummyProfile {
public:
DummyProfile() : _pool(), _profile(new RuntimeProfile(&_pool, "dummy", false)) {}
DummyProfile() : _pool(), _profile(new RuntimeProfile("dummy", false)) {}
RuntimeProfile* profile() { return _profile; }
virtual ~DummyProfile() {
delete _profile;
Expand Down
46 changes: 4 additions & 42 deletions be/src/util/runtime_profile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ static const std::string ROOT_COUNTER = "";

RuntimeProfile::PeriodicCounterUpdateState RuntimeProfile::_s_periodic_counter_update_state;

// TODO: we do not use the param pool should we del the param ObjectPool
RuntimeProfile::RuntimeProfile(ObjectPool* pool, const std::string& name, bool is_averaged_profile)
RuntimeProfile::RuntimeProfile(const std::string& name, bool is_averaged_profile)
: _pool(new ObjectPool()),
_own_pool(false),
_name(name),
Expand Down Expand Up @@ -77,43 +76,6 @@ RuntimeProfile::~RuntimeProfile() {
}
}

RuntimeProfile* RuntimeProfile::create_from_thrift(ObjectPool* pool,
const TRuntimeProfileTree& profiles) {
if (profiles.nodes.size() == 0) {
return NULL;
}

int idx = 0;
return RuntimeProfile::create_from_thrift(pool, profiles.nodes, &idx);
}

RuntimeProfile* RuntimeProfile::create_from_thrift(ObjectPool* pool,
const std::vector<TRuntimeProfileNode>& nodes,
int* idx) {
DCHECK_LT(*idx, nodes.size());

const TRuntimeProfileNode& node = nodes[*idx];
RuntimeProfile* profile = pool->add(new RuntimeProfile(pool, node.name));
profile->_metadata = node.metadata;

for (int i = 0; i < node.counters.size(); ++i) {
const TCounter& counter = node.counters[i];
profile->_counter_map[counter.name] = pool->add(new Counter(counter.type, counter.value));
}

profile->_child_counter_map = node.child_counters_map;
profile->_info_strings = node.info_strings;
profile->_info_strings_display_order = node.info_strings_display_order;

++*idx;

for (int i = 0; i < node.num_children; ++i) {
profile->add_child(RuntimeProfile::create_from_thrift(pool, nodes, idx), false, NULL);
}

return profile;
}

void RuntimeProfile::merge(RuntimeProfile* other) {
DCHECK(other != NULL);

Expand Down Expand Up @@ -168,7 +130,7 @@ void RuntimeProfile::merge(RuntimeProfile* other) {
if (j != _child_map.end()) {
child = j->second;
} else {
child = _pool->add(new RuntimeProfile(_pool.get(), other_child->_name));
child = _pool->add(new RuntimeProfile(other_child->_name));
child->_local_time_percent = other_child->_local_time_percent;
child->_metadata = other_child->_metadata;
bool indent_other_child = other->_children[i].second;
Expand Down Expand Up @@ -257,7 +219,7 @@ void RuntimeProfile::update(const std::vector<TRuntimeProfileNode>& nodes, int*
if (j != _child_map.end()) {
child = j->second;
} else {
child = _pool->add(new RuntimeProfile(_pool.get(), tchild.name));
child = _pool->add(new RuntimeProfile(tchild.name));
child->_metadata = tchild.metadata;
_child_map[tchild.name] = child;
_children.push_back(std::make_pair(child, tchild.indent));
Expand Down Expand Up @@ -325,7 +287,7 @@ void RuntimeProfile::compute_time_in_profile(int64_t total) {
RuntimeProfile* RuntimeProfile::create_child(const std::string& name, bool indent, bool prepend) {
boost::lock_guard<boost::mutex> l(_children_lock);
DCHECK(_child_map.find(name) == _child_map.end());
RuntimeProfile* child = _pool->add(new RuntimeProfile(_pool.get(), name));
RuntimeProfile* child = _pool->add(new RuntimeProfile(name));
if (_children.empty()) {
add_child_unlock(child, indent, NULL);
} else {
Expand Down
17 changes: 2 additions & 15 deletions be/src/util/runtime_profile.h
Original file line number Diff line number Diff line change
Expand Up @@ -275,17 +275,11 @@ class RuntimeProfile {
MonotonicStopWatch _sw;
};

// Create a runtime profile object with 'name'. Counters and merged profile are
// allocated from pool.
RuntimeProfile(ObjectPool* pool, const std::string& name, bool is_averaged_profile = false);
// Create a runtime profile object with 'name'.
RuntimeProfile(const std::string& name, bool is_averaged_profile = false);

~RuntimeProfile();

// Deserialize from thrift. Runtime profiles are allocated from the pool.
static RuntimeProfile* create_from_thrift(
ObjectPool* pool,
const TRuntimeProfileTree& profiles);

// Adds a child profile. This is thread safe.
// 'indent' indicates whether the child will be printed w/ extra indentation
// relative to the parent.
Expand Down Expand Up @@ -601,13 +595,6 @@ class RuntimeProfile {
// for updating them.
static PeriodicCounterUpdateState _s_periodic_counter_update_state;

// Create a subtree of runtime profiles from nodes, starting at *node_idx.
// On return, *node_idx is the index one past the end of this subtree
static RuntimeProfile* create_from_thrift(
ObjectPool* pool,
const std::vector<TRuntimeProfileNode>& nodes,
int* node_idx);

// update a subtree of profiles from nodes, rooted at *idx.
// On return, *idx points to the node immediately following this subtree.
void update(const std::vector<TRuntimeProfileNode>& nodes, int* idx);
Expand Down
3 changes: 1 addition & 2 deletions be/test/exec/plain_text_line_reader_bzip_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace doris {

class PlainTextLineReaderTest : public testing::Test {
public:
PlainTextLineReaderTest() : _profile(&_obj_pool, "TestProfile") {
PlainTextLineReaderTest() : _profile("TestProfile") {
}

protected:
Expand All @@ -36,7 +36,6 @@ class PlainTextLineReaderTest : public testing::Test {
virtual void TearDown() {
}
private:
ObjectPool _obj_pool;
RuntimeProfile _profile;
};

Expand Down
3 changes: 1 addition & 2 deletions be/test/exec/plain_text_line_reader_gzip_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace doris {

class PlainTextLineReaderTest : public testing::Test {
public:
PlainTextLineReaderTest() : _profile(&_obj_pool, "TestProfile") {
PlainTextLineReaderTest() : _profile("TestProfile") {
}

protected:
Expand All @@ -36,7 +36,6 @@ class PlainTextLineReaderTest : public testing::Test {
virtual void TearDown() {
}
private:
ObjectPool _obj_pool;
RuntimeProfile _profile;
};

Expand Down
3 changes: 1 addition & 2 deletions be/test/exec/plain_text_line_reader_lz4frame_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace doris {

class PlainTextLineReaderTest : public testing::Test {
public:
PlainTextLineReaderTest() : _profile(&_obj_pool, "TestProfile") {
PlainTextLineReaderTest() : _profile("TestProfile") {
}

protected:
Expand All @@ -36,7 +36,6 @@ class PlainTextLineReaderTest : public testing::Test {
virtual void TearDown() {
}
private:
ObjectPool _obj_pool;
RuntimeProfile _profile;
};

Expand Down
3 changes: 1 addition & 2 deletions be/test/exec/plain_text_line_reader_lzop_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace doris {

class PlainTextLineReaderTest : public testing::Test {
public:
PlainTextLineReaderTest() : _profile(&_obj_pool, "TestProfile") {
PlainTextLineReaderTest() : _profile("TestProfile") {
}

protected:
Expand All @@ -36,7 +36,6 @@ class PlainTextLineReaderTest : public testing::Test {
virtual void TearDown() {
}
private:
ObjectPool _obj_pool;
RuntimeProfile _profile;
};

Expand Down
Loading