Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion be/src/exec/parquet_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#include "exec/parquet_scanner.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/mem_tracker.h"
#include "runtime/raw_value.h"
#include "runtime/stream_load/load_stream_mgr.h"
#include "runtime/stream_load/stream_load_pipe.h"
Expand Down
1 change: 0 additions & 1 deletion be/src/exec/parquet_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ class ExprContext;
class TupleDescriptor;
class TupleRow;
class RowDescriptor;
class MemTracker;
class RuntimeProfile;
class StreamLoadPipe;

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/push_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,7 @@ OLAPStatus PushBrokerReader::init(const Schema* schema,
}
_runtime_profile = _runtime_state->runtime_profile();
_runtime_profile->set_name("PushBrokerReader");
_mem_tracker.reset(new MemTracker(_runtime_profile, -1, _runtime_profile->name(), _runtime_state->instance_mem_tracker()));
_mem_tracker = MemTracker::CreateTracker(-1, "PushBrokerReader", _runtime_state->instance_mem_tracker());
_mem_pool.reset(new MemPool(_mem_tracker.get()));
_counter.reset(new ScannerCounter());

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/push_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ class PushBrokerReader {
const Schema* _schema;
std::unique_ptr<RuntimeState> _runtime_state;
RuntimeProfile* _runtime_profile;
std::unique_ptr<MemTracker> _mem_tracker;
std::shared_ptr<MemTracker> _mem_tracker;
std::unique_ptr<MemPool> _mem_pool;
std::unique_ptr<ScannerCounter> _counter;
std::unique_ptr<BaseScanner> _scanner;
Expand Down
58 changes: 19 additions & 39 deletions be/src/runtime/mem_tracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ static std::shared_ptr<MemTracker> root_tracker;
static GoogleOnceType root_tracker_once = GOOGLE_ONCE_INIT;

void MemTracker::CreateRootTracker() {
root_tracker.reset(new MemTracker(-1, "root", std::shared_ptr<MemTracker>()));
root_tracker.reset(new MemTracker(-1, "root"));
root_tracker->Init();
}

Expand All @@ -85,7 +85,7 @@ std::shared_ptr<MemTracker> MemTracker::CreateTracker(
} else {
real_parent = GetRootTracker();
}
shared_ptr<MemTracker> tracker(new MemTracker(byte_limit, label, real_parent, log_usage_if_zero));
shared_ptr<MemTracker> tracker(new MemTracker(nullptr, byte_limit, label, real_parent, log_usage_if_zero));
real_parent->AddChildTracker(tracker);
tracker->Init();

Expand All @@ -102,56 +102,36 @@ std::shared_ptr<MemTracker> MemTracker::CreateTracker(
} else {
real_parent = GetRootTracker();
}
shared_ptr<MemTracker> tracker(new MemTracker(profile, byte_limit, label, real_parent));
shared_ptr<MemTracker> tracker(new MemTracker(profile, byte_limit, label, real_parent, true));
real_parent->AddChildTracker(tracker);
tracker->Init();

return tracker;
}

MemTracker::MemTracker(int64_t byte_limit, const std::string& label) :
MemTracker(nullptr, byte_limit, label, std::shared_ptr<MemTracker>(), true) {
}

MemTracker::MemTracker(
RuntimeProfile* profile,
int64_t byte_limit, const string& label, const std::shared_ptr<MemTracker>& parent, bool log_usage_if_zero)
: limit_(byte_limit),
soft_limit_(CalcSoftLimit(byte_limit)),
label_(label),
parent_(parent),
consumption_(std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES)),
consumption_metric_(nullptr),
log_usage_if_zero_(log_usage_if_zero),
num_gcs_metric_(nullptr),
bytes_freed_by_last_gc_metric_(nullptr),
bytes_over_limit_metric_(nullptr),
limit_metric_(nullptr) {
}

MemTracker::MemTracker(RuntimeProfile* profile, int64_t byte_limit,
const std::string& label, const std::shared_ptr<MemTracker>& parent)
: limit_(byte_limit),
soft_limit_(CalcSoftLimit(byte_limit)),
label_(label),
parent_(parent),
consumption_(profile->AddSharedHighWaterMarkCounter(COUNTER_NAME, TUnit::BYTES)),
consumption_metric_(nullptr),
log_usage_if_zero_(true),
num_gcs_metric_(nullptr),
bytes_freed_by_last_gc_metric_(nullptr),
bytes_over_limit_metric_(nullptr),
limit_metric_(nullptr) {
}

MemTracker::MemTracker(IntGauge* consumption_metric,
int64_t byte_limit, const string& label, const std::shared_ptr<MemTracker>& parent)
: limit_(byte_limit),
soft_limit_(CalcSoftLimit(byte_limit)),
label_(label),
parent_(parent),
consumption_(std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES)),
consumption_metric_(consumption_metric),
log_usage_if_zero_(true),
num_gcs_metric_(nullptr),
bytes_freed_by_last_gc_metric_(nullptr),
bytes_over_limit_metric_(nullptr),
limit_metric_(nullptr) {
if (profile == nullptr) {
consumption_ = std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES);
} else {
consumption_ = profile->AddSharedHighWaterMarkCounter(COUNTER_NAME, TUnit::BYTES);
}
}

void MemTracker::Init() {
Expand Down Expand Up @@ -232,23 +212,23 @@ int64_t MemTracker::GetPoolMemReserved() {
return mem_reserved;
}

MemTracker* PoolMemTrackerRegistry::GetRequestPoolMemTracker(
std::shared_ptr<MemTracker> PoolMemTrackerRegistry::GetRequestPoolMemTracker(
const string& pool_name, bool create_if_not_present) {
DCHECK(!pool_name.empty());
lock_guard<SpinLock> l(pool_to_mem_trackers_lock_);
PoolTrackersMap::iterator it = pool_to_mem_trackers_.find(pool_name);
if (it != pool_to_mem_trackers_.end()) {
MemTracker* tracker = it->second.get();
DCHECK(pool_name == tracker->pool_name_);
return tracker;
return it->second;
}
if (!create_if_not_present) return nullptr;
// First time this pool_name registered, make a new object.
MemTracker* tracker =
new MemTracker(-1, Substitute(REQUEST_POOL_MEM_TRACKER_LABEL_FORMAT, pool_name),
ExecEnv::GetInstance()->process_mem_tracker());
std::shared_ptr<MemTracker> tracker = MemTracker::CreateTracker(
-1, Substitute(REQUEST_POOL_MEM_TRACKER_LABEL_FORMAT, pool_name),
ExecEnv::GetInstance()->process_mem_tracker());
tracker->pool_name_ = pool_name;
pool_to_mem_trackers_.emplace(pool_name, unique_ptr<MemTracker>(tracker));
pool_to_mem_trackers_.emplace(pool_name, std::shared_ptr<MemTracker>(tracker));
return tracker;
}

Expand Down
42 changes: 19 additions & 23 deletions be/src/runtime/mem_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,27 +96,9 @@ class MemTracker : public std::enable_shared_from_this<MemTracker> {
const std::string& label = std::string(),
const std::shared_ptr<MemTracker>& parent = std::shared_ptr<MemTracker>());

/// 'byte_limit' < 0 means no limit
/// 'label' is the label used in the usage string (LogUsage())
/// If 'log_usage_if_zero' is false, this tracker (and its children) will not be
/// included
/// in LogUsage() output if consumption is 0.
MemTracker(int64_t byte_limit = -1, const std::string& label = std::string(),
const std::shared_ptr<MemTracker>& parent = std::shared_ptr<MemTracker>(),
bool log_usage_if_zero = true);

/// C'tor for tracker for which consumption counter is created as part of a profile.
/// The counter is created with name COUNTER_NAME.
MemTracker(RuntimeProfile* profile, int64_t byte_limit,
const std::string& label = std::string(), const std::shared_ptr<MemTracker>& parent = std::shared_ptr<MemTracker>());

// TODO(yingchun): not used, remove it later
/// C'tor for tracker that uses consumption_metric as the consumption value.
/// Consume()/Release() can still be called. This is used for the root process tracker
/// (if 'parent' is NULL). It is also to report on other categories of memory under the
/// process tracker, e.g. buffer pool free buffers (if 'parent - non-NULL).
MemTracker(IntGauge* consumption_metric, int64_t byte_limit = -1,
const std::string& label = std::string(), const std::shared_ptr<MemTracker>& parent = std::shared_ptr<MemTracker>());
// this is used for creating an orphan mem tracker, or for unit test.
// If a mem tracker has parent, it should be created by `CreateTracker()`
MemTracker(int64_t byte_limit = -1, const std::string& label = std::string());

~MemTracker();

Expand Down Expand Up @@ -432,6 +414,15 @@ class MemTracker : public std::enable_shared_from_this<MemTracker> {

static const std::string COUNTER_NAME;

private:
/// 'byte_limit' < 0 means no limit
/// 'label' is the label used in the usage string (LogUsage())
/// If 'log_usage_if_zero' is false, this tracker (and its children) will not be
/// included in LogUsage() output if consumption is 0.
MemTracker(RuntimeProfile* profile, int64_t byte_limit, const std::string& label,
const std::shared_ptr<MemTracker>& parent,
bool log_usage_if_zero);

private:
friend class PoolMemTrackerRegistry;

Expand Down Expand Up @@ -588,14 +579,19 @@ class PoolMemTrackerRegistry {
/// with the process tracker as its parent. There is no explicit per-pool byte_limit
/// set at any particular impalad, so newly created trackers will always have a limit
/// of -1.
MemTracker* GetRequestPoolMemTracker(
/// TODO(cmy): this function is not used for now. the memtracker returned from here is
/// got from a shared_ptr in `pool_to_mem_trackers_`.
/// This funtion is from
/// https://github.com/cloudera/Impala/blob/495397101e5807c701df71ea288f4815d69c2c8a/be/src/runtime/mem-tracker.h#L497
/// And in impala this function will return a raw pointer.
std::shared_ptr<MemTracker> GetRequestPoolMemTracker(
const std::string& pool_name, bool create_if_not_present);

private:
/// All per-request pool MemTracker objects. It is assumed that request pools will live
/// for the entire duration of the process lifetime so MemTrackers are never removed
/// from this map. Protected by '_pool_to_mem_trackers_lock'
typedef std::unordered_map<std::string, std::unique_ptr<MemTracker>> PoolTrackersMap;
typedef std::unordered_map<std::string, std::shared_ptr<MemTracker>> PoolTrackersMap;
PoolTrackersMap pool_to_mem_trackers_;
/// IMPALA-3068: Use SpinLock instead of std::mutex so that the lock won't
/// automatically destroy itself as part of process teardown, which could cause races.
Expand Down