Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,11 @@ DEFINE_mInt64(stacktrace_in_alloc_large_memory_bytes, "2147483648");

DEFINE_mInt64(crash_in_alloc_large_memory_bytes, "-1");

DEFINE_mBool(enable_memory_orphan_check, "false");
// default is true. if any memory tracking in Orphan mem tracker will report error.
// !! not modify the default value of this conf!! otherwise memory errors cannot be detected in time.
// allocator free memory not need to check, because when the thread memory tracker label is Orphan,
// use the tracker saved in Allocator.
DEFINE_mBool(enable_memory_orphan_check, "true");

// The maximum time a thread waits for full GC. Currently only query will wait for full gc.
DEFINE_mInt32(thread_wait_gc_max_milliseconds, "1000");
Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ DECLARE_mInt64(stacktrace_in_alloc_large_memory_bytes);
DECLARE_mInt64(crash_in_alloc_large_memory_bytes);

// default is true. if any memory tracking in Orphan mem tracker will report error.
// !! not modify the default value of this conf!! otherwise memory errors cannot be detected in time.
// allocator free memory not need to check, because when the thread memory tracker label is Orphan,
// use the tracker saved in Allocator.
DECLARE_mBool(enable_memory_orphan_check);

// The maximum time a thread waits for a full GC. Currently only query will wait for full gc.
Expand Down
4 changes: 2 additions & 2 deletions be/src/http/action/http_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,8 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) {
// schema_buffer stores 1M of data for parsing column information
// need to determine whether to cache for the first time
if (ctx->is_read_schema) {
if (ctx->schema_buffer->pos + remove_bytes < config::stream_tvf_buffer_size) {
ctx->schema_buffer->put_bytes(bb->ptr, remove_bytes);
if (ctx->schema_buffer()->pos + remove_bytes < config::stream_tvf_buffer_size) {
ctx->schema_buffer()->put_bytes(bb->ptr, remove_bytes);
} else {
LOG(INFO) << "use a portion of data to request fe to obtain column information";
ctx->is_read_schema = false;
Expand Down
6 changes: 3 additions & 3 deletions be/src/io/file_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,9 @@ Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderS
// Here, a portion of the data is processed to parse column information
auto pipe = std::make_shared<io::StreamLoadPipe>(
io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */,
stream_load_ctx->schema_buffer->pos /* total_length */);
stream_load_ctx->schema_buffer->flip();
RETURN_IF_ERROR(pipe->append(stream_load_ctx->schema_buffer));
stream_load_ctx->schema_buffer()->pos /* total_length */);
stream_load_ctx->schema_buffer()->flip();
RETURN_IF_ERROR(pipe->append(stream_load_ctx->schema_buffer()));
RETURN_IF_ERROR(pipe->finish());
*file_reader = std::move(pipe);
} else {
Expand Down
10 changes: 4 additions & 6 deletions be/src/olap/page_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,10 @@ template <typename TAllocator>
PageBase<TAllocator>::PageBase(size_t b, bool use_cache, segment_v2::PageTypePB page_type)
: LRUCacheValueBase(), _size(b), _capacity(b) {
if (use_cache) {
_mem_tracker_by_allocator = StoragePageCache::instance()->mem_tracker(page_type);
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
StoragePageCache::instance()->mem_tracker(page_type));
_data = reinterpret_cast<char*>(TAllocator::alloc(_capacity, ALLOCATOR_ALIGNMENT_16));
} else {
_mem_tracker_by_allocator = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
}
{
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker_by_allocator);
_data = reinterpret_cast<char*>(TAllocator::alloc(_capacity, ALLOCATOR_ALIGNMENT_16));
}
}
Expand All @@ -42,7 +40,7 @@ template <typename TAllocator>
PageBase<TAllocator>::~PageBase() {
if (_data != nullptr) {
DCHECK(_capacity != 0 && _size != 0);
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker_by_allocator);
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(TAllocator::mem_tracker_);
TAllocator::free(_data, _capacity);
}
}
Expand Down
1 change: 0 additions & 1 deletion be/src/olap/page_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ class PageBase : private TAllocator, public LRUCacheValueBase {
// Effective size, smaller than capacity, such as data page remove checksum suffix.
size_t _size = 0;
size_t _capacity = 0;
std::shared_ptr<MemTrackerLimiter> _mem_tracker_by_allocator;
};

using DataPage = PageBase<Allocator<false>>;
Expand Down
17 changes: 10 additions & 7 deletions be/src/runtime/stream_load/stream_load_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,9 @@ class StreamLoadContext {
public:
StreamLoadContext(ExecEnv* exec_env) : id(UniqueId::gen_uid()), _exec_env(exec_env) {
start_millis = UnixMillis();
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());
schema_buffer = ByteBuffer::allocate(config::stream_tvf_buffer_size);
}

~StreamLoadContext() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->stream_load_pipe_tracker());
schema_buffer.reset();
if (need_rollback) {
_exec_env->stream_load_executor()->rollback_txn(this);
need_rollback = false;
Expand All @@ -126,6 +121,15 @@ class StreamLoadContext {

bool is_mow_table() const;

ByteBufferPtr schema_buffer() {
if (_schema_buffer == nullptr) {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->stream_load_pipe_tracker());
_schema_buffer = ByteBuffer::allocate(config::stream_tvf_buffer_size);
}
return _schema_buffer;
}

public:
static const int default_txn_id = -1;
// load type, eg: ROUTINE LOAD/MANUAL LOAD
Expand Down Expand Up @@ -190,8 +194,6 @@ class StreamLoadContext {
std::shared_ptr<MessageBodySink> body_sink;
std::shared_ptr<io::StreamLoadPipe> pipe;

ByteBufferPtr schema_buffer;

TStreamLoadPutResult put_result;
TStreamLoadMultiTablePutResult multi_table_put_result;

Expand Down Expand Up @@ -253,6 +255,7 @@ class StreamLoadContext {

private:
ExecEnv* _exec_env = nullptr;
ByteBufferPtr _schema_buffer;
};

} // namespace doris
28 changes: 14 additions & 14 deletions be/src/runtime/thread_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
// Used after SCOPED_ATTACH_TASK, in order to count the memory into another
// MemTrackerLimiter instead of the MemTrackerLimiter added by the attach task.
#define SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(arg1) \
auto VARNAME_LINENUM(switch_mem_tracker) = SwitchThreadMemTrackerLimiter(arg1)
auto VARNAME_LINENUM(switch_mem_tracker) = doris::SwitchThreadMemTrackerLimiter(arg1)

// Looking forward to tracking memory during thread execution into MemTracker.
// Usually used to record query more detailed memory, including ExecNode operators.
Expand Down Expand Up @@ -170,8 +170,7 @@ static std::string memory_orphan_check_msg =
"each thread is expected to use SCOPED_ATTACH_TASK to bind a MemTrackerLimiter belonging "
"to Query/Load/Compaction/Other Tasks, otherwise memory alloc using Doris Allocator in the "
"thread will crash. If you want to switch MemTrackerLimiter during thread execution, "
"please use SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER, do not repeat Attach. Of course, you "
"can modify enable_memory_orphan_check=false in be.conf to avoid this crash.";
"please use SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER, do not repeat Attach.";

// The thread context saves some info about a working thread.
// 2 required info:
Expand Down Expand Up @@ -222,9 +221,9 @@ class ThreadContext {
ss << std::this_thread::get_id();
return ss.str();
}
// After thread_mem_tracker_mgr is initialized, the current thread Hook starts to
// consume/release mem_tracker.
// Note that the use of shared_ptr will cause a crash. The guess is that there is an
// Note that if set global Memory Hook, After thread_mem_tracker_mgr is initialized,
// the current thread Hook starts to consume/release mem_tracker.
// the use of shared_ptr will cause a crash. The guess is that there is an
// intermediate state during the copy construction of shared_ptr. Shared_ptr is not equal
// to nullptr, but the object it points to is not initialized. At this time, when the memory
// is released somewhere, the hook is triggered to cause the crash.
Expand Down Expand Up @@ -318,7 +317,7 @@ class ThreadLocalHandle {
// The brpc server should respond as quickly as possible.
bthread_context->thread_mem_tracker_mgr->disable_wait_gc();
// set the data so that next time bthread_getspecific in the thread returns the data.
CHECK(0 == bthread_setspecific(btls_key, bthread_context) || k_doris_exit);
CHECK(0 == bthread_setspecific(btls_key, bthread_context) || doris::k_doris_exit);
}
DCHECK(bthread_context != nullptr);
bthread_context->thread_local_handle_count++;
Expand Down Expand Up @@ -360,7 +359,7 @@ static ThreadContext* thread_context(bool allow_return_null = false) {
// in bthread
// bthread switching pthread may be very frequent, remember not to use lock or other time-consuming operations.
auto* bthread_context = static_cast<ThreadContext*>(bthread_getspecific(btls_key));
DCHECK(bthread_context != nullptr);
DCHECK(bthread_context != nullptr && bthread_context->thread_local_handle_count > 0);
return bthread_context;
}
if (allow_return_null) {
Expand Down Expand Up @@ -449,15 +448,16 @@ class AttachTask {

class SwitchThreadMemTrackerLimiter {
public:
explicit SwitchThreadMemTrackerLimiter(const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
explicit SwitchThreadMemTrackerLimiter(
const std::shared_ptr<doris::MemTrackerLimiter>& mem_tracker) {
DCHECK(mem_tracker);
ThreadLocalHandle::create_thread_local_if_not_exits();
doris::ThreadLocalHandle::create_thread_local_if_not_exits();
_old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker);
}

explicit SwitchThreadMemTrackerLimiter(const QueryThreadContext& query_thread_context) {
ThreadLocalHandle::create_thread_local_if_not_exits();
explicit SwitchThreadMemTrackerLimiter(const doris::QueryThreadContext& query_thread_context) {
doris::ThreadLocalHandle::create_thread_local_if_not_exits();
DCHECK(thread_context()->task_id() ==
query_thread_context.query_id); // workload group alse not change
DCHECK(query_thread_context.query_mem_tracker);
Expand All @@ -468,11 +468,11 @@ class SwitchThreadMemTrackerLimiter {

~SwitchThreadMemTrackerLimiter() {
thread_context()->thread_mem_tracker_mgr->detach_limiter_tracker(_old_mem_tracker);
ThreadLocalHandle::del_thread_local_if_count_is_zero();
doris::ThreadLocalHandle::del_thread_local_if_count_is_zero();
}

private:
std::shared_ptr<MemTrackerLimiter> _old_mem_tracker;
std::shared_ptr<doris::MemTrackerLimiter> _old_mem_tracker;
};

class AddThreadMemTrackerConsumer {
Expand Down
3 changes: 2 additions & 1 deletion be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,8 @@ void PInternalService::fetch_table_schema(google::protobuf::RpcController* contr

std::shared_ptr<MemTrackerLimiter> mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::OTHER,
fmt::format("{}#{}", params.format_type, params.file_type));
fmt::format("InternalService::fetch_table_schema:{}#{}", params.format_type,
params.file_type));
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker);

// make sure profile is desctructed after reader cause PrefetchBufferedReader
Expand Down
6 changes: 5 additions & 1 deletion be/src/util/byte_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include "common/logging.h"
#include "common/status.h"
#include "runtime/thread_context.h"
#include "vec/common/allocator.h"
#include "vec/common/allocator_fwd.h"

Expand All @@ -43,7 +44,10 @@ struct ByteBuffer : private Allocator<false> {
return Status::OK();
}

~ByteBuffer() { Allocator<false>::free(ptr, capacity); }
~ByteBuffer() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker_);
Allocator<false>::free(ptr, capacity);
}

void put_bytes(const char* data, size_t size) {
memcpy(ptr + pos, data, size);
Expand Down
33 changes: 31 additions & 2 deletions be/src/vec/common/allocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,14 +211,43 @@ void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::memory_

template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator>
void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::consume_memory(
size_t size) const {
size_t size) {
// Usually, an object that inherits Allocator has the same TLS tracker for each alloc.
// If an object that inherits Allocator needs to be reused by multiple queries,
// it is necessary to switch the same tracker to TLS when calling alloc.
// However, in ORC Reader, ORC DataBuffer will be reused, but we cannot switch TLS tracker,
// so we update the Allocator tracker when the TLS tracker changes.
// note that the tracker in thread context when object that inherit Allocator is constructed may be
// no attach memory tracker in tls. usually the memory tracker is attached in tls only during the first alloc.
if (mem_tracker_ == nullptr ||
mem_tracker_->label() != doris::thread_context()->thread_mem_tracker()->label()) {
mem_tracker_ = doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
}
CONSUME_THREAD_MEM_TRACKER(size);
}

template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator>
void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::release_memory(
size_t size) const {
RELEASE_THREAD_MEM_TRACKER(size);
doris::ThreadContext* thread_context = doris::thread_context(true);
if ((thread_context && thread_context->thread_mem_tracker()->label() != "Orphan") ||
mem_tracker_ == nullptr) {
// If thread_context exist and the label of thread_mem_tracker not equal to `Orphan`,
// this means that in the scope of SCOPED_ATTACH_TASK,
// so thread_mem_tracker should be used to release memory.
// If mem_tracker_ is nullptr there is a scenario where an object that inherits Allocator
// has never called alloc, but free memory.
// in phmap, the memory alloced by an object may be transferred to another object and then free.
// in this case, thread context must attach a memory tracker other than Orphan,
// otherwise memory tracking will be wrong.
RELEASE_THREAD_MEM_TRACKER(size);
} else {
// if thread_context does not exist or the label of thread_mem_tracker is equal to
// `Orphan`, it usually happens during object destruction. This means that
// the scope of SCOPED_ATTACH_TASK has been left, so release memory using Allocator tracker.
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker_);
RELEASE_THREAD_MEM_TRACKER(size);
}
}

template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator>
Expand Down
8 changes: 7 additions & 1 deletion be/src/vec/common/allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ static constexpr size_t MALLOC_MIN_ALIGNMENT = 8;
// is always a multiple of sixteen. (https://www.gnu.org/software/libc/manual/html_node/Aligned-Memory-Blocks.html)
static constexpr int ALLOCATOR_ALIGNMENT_16 = 16;

namespace doris {
class MemTrackerLimiter;
}

class DefaultMemoryAllocator {
public:
static void* malloc(size_t size) __THROW { return std::malloc(size); }
Expand Down Expand Up @@ -228,7 +232,7 @@ class Allocator {
// alloc will continue to execute, so the consume memtracker is forced.
void memory_check(size_t size) const;
// Increases consumption of this tracker by 'bytes'.
void consume_memory(size_t size) const;
void consume_memory(size_t size);
void release_memory(size_t size) const;
void throw_bad_alloc(const std::string& err) const;
#ifndef NDEBUG
Expand Down Expand Up @@ -400,6 +404,8 @@ class Allocator {

static constexpr bool clear_memory = clear_memory_;

std::shared_ptr<doris::MemTrackerLimiter> mem_tracker_ {nullptr};

// Freshly mmapped pages are copy-on-write references to a global zero page.
// On the first write, a page fault occurs, and an actual writable page is
// allocated. If we are going to use this memory soon, such as when resizing
Expand Down