Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1059,6 +1059,7 @@ DEFINE_mString(kerberos_krb5_conf_path, "/etc/krb5.conf");

DEFINE_mString(get_stack_trace_tool, "libunwind");
DEFINE_mString(dwarf_location_info_mode, "FAST");
DEFINE_mBool(enable_address_sanitizers_with_stack_trace, "false");

// the ratio of _prefetch_size/_batch_size in AutoIncIDBuffer
DEFINE_mInt64(auto_inc_prefetch_size_ratio, "10");
Expand Down
1 change: 1 addition & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1110,6 +1110,7 @@ DECLARE_mString(kerberos_krb5_conf_path);

// Values include `none`, `glog`, `boost`, `glibc`, `libunwind`
DECLARE_mString(get_stack_trace_tool);
DECLARE_mBool(enable_address_sanitizers_with_stack_trace);

// DISABLED: Don't resolve location info.
// FAST: Perform CU lookup using .debug_aranges (might be incomplete).
Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ class ExecEnv {
std::shared_ptr<MemTrackerLimiter> segcompaction_mem_tracker() {
return _segcompaction_mem_tracker;
}
std::shared_ptr<MemTrackerLimiter> point_query_executor_mem_tracker() {
return _point_query_executor_mem_tracker;
}
std::shared_ptr<MemTrackerLimiter> rowid_storage_reader_tracker() {
return _rowid_storage_reader_tracker;
}
Expand Down Expand Up @@ -348,6 +351,7 @@ class ExecEnv {
std::shared_ptr<MemTracker> _brpc_iobuf_block_memory_tracker;
// Count the memory consumption of segment compaction tasks.
std::shared_ptr<MemTrackerLimiter> _segcompaction_mem_tracker;
std::shared_ptr<MemTrackerLimiter> _point_query_executor_mem_tracker;

// TODO, looking forward to more accurate tracking.
std::shared_ptr<MemTrackerLimiter> _rowid_storage_reader_tracker;
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,8 @@ void ExecEnv::init_mem_tracker() {
std::make_shared<MemTracker>("IOBufBlockMemory", _details_mem_tracker_set.get());
_segcompaction_mem_tracker =
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "SegCompaction");
_point_query_executor_mem_tracker =
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "PointQueryExecutor");
_rowid_storage_reader_tracker =
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "RowIdStorageReader");
_subcolumns_tree_tracker =
Expand Down
82 changes: 77 additions & 5 deletions be/src/runtime/memory/mem_tracker_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "util/perf_counters.h"
#include "util/pretty_printer.h"
#include "util/runtime_profile.h"
#include "util/stack_util.h"

namespace doris {

Expand Down Expand Up @@ -99,7 +100,7 @@ std::shared_ptr<MemTrackerLimiter> MemTrackerLimiter::create_shared(MemTrackerLi
MemTrackerLimiter::~MemTrackerLimiter() {
consume(_untracked_mem);
static std::string mem_tracker_inaccurate_msg =
", mem tracker not equal to 0 when mem tracker destruct, this usually means that "
"mem tracker not equal to 0 when mem tracker destruct, this usually means that "
"memory tracking is inaccurate and SCOPED_ATTACH_TASK and "
"SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER are not used correctly. "
"1. For query and load, memory leaks may have occurred, it is expected that the query "
Expand All @@ -115,19 +116,90 @@ MemTrackerLimiter::~MemTrackerLimiter() {
if (_consumption->current_value() != 0) {
// TODO, expect mem tracker equal to 0 at the task end.
if (doris::config::enable_memory_orphan_check && _type == Type::QUERY) {
LOG(INFO) << "mem tracker label: " << _label
<< ", consumption: " << _consumption->current_value()
<< ", peak consumption: " << _consumption->peak_value()
<< mem_tracker_inaccurate_msg;
std::string err_msg =
fmt::format("mem tracker label: {}, consumption: {}, peak consumption: {}, {}.",
label(), _consumption->current_value(), _consumption->peak_value(),
mem_tracker_inaccurate_msg);
#ifdef NDEBUG
LOG(INFO) << err_msg;
#else
LOG(FATAL) << err_msg << print_address_sanitizers();
#endif
}
if (ExecEnv::tracking_memory()) {
ExecEnv::GetInstance()->orphan_mem_tracker()->consume(_consumption->current_value());
}
_consumption->set(0);
#ifndef NDEBUG
} else if (!_address_sanitizers.empty()) {
LOG(FATAL) << "[Address Sanitizer] consumption is 0, but address sanitizers not empty. "
<< ", mem tracker label: " << _label
<< ", peak consumption: " << _consumption->peak_value()
<< print_address_sanitizers();
#endif
}
g_memtrackerlimiter_cnt << -1;
}

#ifndef NDEBUG
void MemTrackerLimiter::add_address_sanitizers(void* buf, size_t size) {
if (_type == Type::QUERY) {
std::lock_guard<std::mutex> l(_address_sanitizers_mtx);
auto it = _address_sanitizers.find(buf);
if (it != _address_sanitizers.end()) {
LOG(FATAL) << "[Address Sanitizer] memory buf repeat add, mem tracker label: " << _label
<< ", consumption: " << _consumption->current_value()
<< ", peak consumption: " << _consumption->peak_value() << ", buf: " << buf
<< ", size: " << size << ", old buf: " << it->first
<< ", old size: " << it->second.size
<< ", new stack_trace: " << get_stack_trace()
<< ", old stack_trace: " << it->second.stack_trace;
}

// if alignment not equal to 0, maybe usable_size > size.
AddressSanitizer as = {size, doris::config::enable_address_sanitizers_with_stack_trace
? get_stack_trace()
: ""};
_address_sanitizers.emplace(buf, as);
}
}

void MemTrackerLimiter::remove_address_sanitizers(void* buf, size_t size) {
if (_type == Type::QUERY) {
std::lock_guard<std::mutex> l(_address_sanitizers_mtx);
auto it = _address_sanitizers.find(buf);
if (it != _address_sanitizers.end()) {
if (it->second.size != size) {
LOG(FATAL) << "[Address Sanitizer] free memory buf size inaccurate, mem tracker "
"label: "
<< _label << ", consumption: " << _consumption->current_value()
<< ", peak consumption: " << _consumption->peak_value()
<< ", buf: " << buf << ", size: " << size << ", old buf: " << it->first
<< ", old size: " << it->second.size
<< ", new stack_trace: " << get_stack_trace()
<< ", old stack_trace: " << it->second.stack_trace;
}
_address_sanitizers.erase(buf);
} else {
LOG(FATAL) << "[Address Sanitizer] memory buf not exist, mem tracker label: " << _label
<< ", consumption: " << _consumption->current_value()
<< ", peak consumption: " << _consumption->peak_value() << ", buf: " << buf
<< ", size: " << size << ", stack_trace: " << get_stack_trace();
}
}
}

std::string MemTrackerLimiter::print_address_sanitizers() {
std::lock_guard<std::mutex> l(_address_sanitizers_mtx);
std::string detail = "[Address Sanitizer]:";
for (const auto& it : _address_sanitizers) {
detail += fmt::format("\n {}, size {}, strack trace: {}", it.first, it.second.size,
it.second.stack_trace);
}
return detail;
}
#endif

MemTracker::Snapshot MemTrackerLimiter::make_snapshot() const {
Snapshot snapshot;
snapshot.type = type_string(_type);
Expand Down
16 changes: 16 additions & 0 deletions be/src/runtime/memory/mem_tracker_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,12 @@ class MemTrackerLimiter final : public MemTracker {
// Log the memory usage when memory limit is exceeded.
std::string tracker_limit_exceeded_str();

#ifndef NDEBUG
void add_address_sanitizers(void* buf, size_t size);
void remove_address_sanitizers(void* buf, size_t size);
std::string print_address_sanitizers();
#endif

std::string debug_string() override {
std::stringstream msg;
msg << "limit: " << _limit << "; "
Expand Down Expand Up @@ -274,6 +280,16 @@ class MemTrackerLimiter final : public MemTracker {
// Avoid frequent printing.
bool _enable_print_log_usage = false;
static std::atomic<bool> _enable_print_log_process_usage;

#ifndef NDEBUG
struct AddressSanitizer {
size_t size;
std::string stack_trace;
};

std::mutex _address_sanitizers_mtx;
std::unordered_map<void*, AddressSanitizer> _address_sanitizers;
#endif
};

inline int64_t MemTrackerLimiter::add_untracked_mem(int64_t bytes) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/thread_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ class AddThreadMemTrackerConsumerByHook {
// must call create_thread_local_if_not_exits() before use thread_context().
#define CONSUME_THREAD_MEM_TRACKER(size) \
do { \
if (doris::use_mem_hook || size == 0) { \
if (size == 0 || doris::use_mem_hook) { \
break; \
} \
if (doris::pthread_context_ptr_init) { \
Expand Down
11 changes: 5 additions & 6 deletions be/src/service/point_query_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "olap/tablet_schema.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "util/key_util.h"
#include "util/runtime_profile.h"
#include "util/thrift_util.h"
Expand Down Expand Up @@ -165,7 +166,8 @@ void RowCache::erase(const RowCacheKey& key) {
}

PointQueryExecutor::~PointQueryExecutor() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->point_query_executor_mem_tracker());
_tablet.reset();
_reusable.reset();
_result_block.reset();
Expand All @@ -179,10 +181,7 @@ Status PointQueryExecutor::init(const PTabletKeyLookupRequest* request,
// using cache
__int128_t uuid =
static_cast<__int128_t>(request->uuid().uuid_high()) << 64 | request->uuid().uuid_low();
_mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::QUERY,
fmt::format("PointQueryExecutor:{}#{}", uuid, request->tablet_id()));
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->point_query_executor_mem_tracker());
auto cache_handle = LookupConnectionCache::instance()->get(uuid);
_binary_row_format = request->is_binary_row();
if (cache_handle != nullptr) {
Expand Down Expand Up @@ -230,7 +229,7 @@ Status PointQueryExecutor::init(const PTabletKeyLookupRequest* request,
}

Status PointQueryExecutor::lookup_up() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->point_query_executor_mem_tracker());
RETURN_IF_ERROR(_lookup_row_key());
RETURN_IF_ERROR(_lookup_row_data());
RETURN_IF_ERROR(_output_data());
Expand Down
1 change: 0 additions & 1 deletion be/src/service/point_query_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,6 @@ class PointQueryExecutor {
std::vector<RowReadContext> _row_read_ctxs;
std::shared_ptr<Reusable> _reusable;
std::unique_ptr<vectorized::Block> _result_block;
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
Metrics _profile_metrics;
bool _binary_row_format = false;
// snapshot read version
Expand Down
6 changes: 6 additions & 0 deletions be/src/util/block_compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
#include "gutil/endian.h"
#include "gutil/strings/substitute.h"
#include "orc/OrcFile.hh"
#include "runtime/thread_context.h"
#include "util/bit_util.h"
#include "util/defer_op.h"
#include "util/faststring.h"
Expand Down Expand Up @@ -767,6 +768,7 @@ class ZstdBlockCompression : public BlockCompressionCodec {
return &s_instance;
}
~ZstdBlockCompression() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context);
for (auto ctx : _ctx_c_pool) {
_delete_compression_ctx(ctx);
}
Expand All @@ -786,6 +788,7 @@ class ZstdBlockCompression : public BlockCompressionCodec {
// https://github.com/facebook/zstd/blob/dev/examples/streaming_compression.c
Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
faststring* output) override {
_query_thread_context.init();
CContext* context;
RETURN_IF_ERROR(_acquire_compression_ctx(&context));
bool compress_failed = false;
Expand Down Expand Up @@ -864,6 +867,7 @@ class ZstdBlockCompression : public BlockCompressionCodec {
}

Status decompress(const Slice& input, Slice* output) override {
_query_thread_context.init();
DContext* context;
bool decompress_failed = false;
RETURN_IF_ERROR(_acquire_decompression_ctx(&context));
Expand Down Expand Up @@ -960,6 +964,8 @@ class ZstdBlockCompression : public BlockCompressionCodec {

mutable std::mutex _ctx_d_mutex;
mutable std::vector<DContext*> _ctx_d_pool;

QueryThreadContext _query_thread_context;
};

class GzipBlockCompression : public ZlibBlockCompression {
Expand Down
24 changes: 24 additions & 0 deletions be/src/vec/common/allocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,30 @@ void Allocator<clear_memory_, mmap_populate, use_mmap>::throw_bad_alloc(
throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err);
}

#ifndef NDEBUG
template <bool clear_memory_, bool mmap_populate, bool use_mmap>
void Allocator<clear_memory_, mmap_populate, use_mmap>::add_address_sanitizers(void* buf,
size_t size) const {
#ifdef BE_TEST
if (!doris::ExecEnv::ready()) {
return;
}
#endif
doris::thread_context()->thread_mem_tracker()->add_address_sanitizers(buf, size);
}

template <bool clear_memory_, bool mmap_populate, bool use_mmap>
void Allocator<clear_memory_, mmap_populate, use_mmap>::remove_address_sanitizers(
void* buf, size_t size) const {
#ifdef BE_TEST
if (!doris::ExecEnv::ready()) {
return;
}
#endif
doris::thread_context()->thread_mem_tracker()->remove_address_sanitizers(buf, size);
}
#endif

template <bool clear_memory_, bool mmap_populate, bool use_mmap>
void* Allocator<clear_memory_, mmap_populate, use_mmap>::alloc(size_t size, size_t alignment) {
return alloc_impl(size, alignment);
Expand Down
25 changes: 25 additions & 0 deletions be/src/vec/common/allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,18 @@ class Allocator {
void consume_memory(size_t size) const;
void release_memory(size_t size) const;
void throw_bad_alloc(const std::string& err) const;
#ifndef NDEBUG
void add_address_sanitizers(void* buf, size_t size) const;
void remove_address_sanitizers(void* buf, size_t size) const;
#endif

void* alloc(size_t size, size_t alignment = 0);
void* realloc(void* buf, size_t old_size, size_t new_size, size_t alignment = 0);

/// Allocate memory range.
void* alloc_impl(size_t size, size_t alignment = 0) {
memory_check(size);
// consume memory in tracker before alloc, similar to early declaration.
consume_memory(size);
void* buf;

Expand Down Expand Up @@ -125,6 +130,9 @@ class Allocator {
release_memory(size);
throw_bad_alloc(fmt::format("Allocator: Cannot malloc {}.", size));
}
#ifndef NDEBUG
add_address_sanitizers(buf, size);
#endif
} else {
buf = nullptr;
int res = posix_memalign(&buf, alignment, size);
Expand All @@ -134,6 +142,9 @@ class Allocator {
throw_bad_alloc(
fmt::format("Cannot allocate memory (posix_memalign) {}.", size));
}
#ifndef NDEBUG
add_address_sanitizers(buf, size);
#endif

if constexpr (clear_memory) memset(buf, 0, size);
}
Expand All @@ -148,6 +159,9 @@ class Allocator {
throw_bad_alloc(fmt::format("Allocator: Cannot munmap {}.", size));
}
} else {
#ifndef NDEBUG
remove_address_sanitizers(buf, size);
#endif
::free(buf);
}
release_memory(size);
Expand All @@ -169,13 +183,20 @@ class Allocator {
if (!use_mmap ||
(old_size < doris::config::mmap_threshold && new_size < doris::config::mmap_threshold &&
alignment <= MALLOC_MIN_ALIGNMENT)) {
#ifndef NDEBUG
remove_address_sanitizers(buf, old_size);
#endif
/// Resize malloc'd memory region with no special alignment requirement.
void* new_buf = ::realloc(buf, new_size);
if (nullptr == new_buf) {
release_memory(new_size - old_size);
throw_bad_alloc(fmt::format("Allocator: Cannot realloc from {} to {}.", old_size,
new_size));
}
#ifndef NDEBUG
add_address_sanitizers(
new_buf, new_size); // usually, buf addr = new_buf addr, asan maybe not equal.
#endif

buf = new_buf;
if constexpr (clear_memory)
Expand Down Expand Up @@ -205,6 +226,10 @@ class Allocator {
// Big allocs that requires a copy.
void* new_buf = alloc(new_size, alignment);
memcpy(new_buf, buf, std::min(old_size, new_size));
#ifndef NDEBUG
add_address_sanitizers(new_buf, new_size);
remove_address_sanitizers(buf, old_size);
#endif
free(buf, old_size);
buf = new_buf;
}
Expand Down
Loading