Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/common/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@

namespace doris {

#ifndef ARRAY_SIZE
#define ARRAY_SIZE(a) (sizeof(a)/sizeof((a)[0]))
#endif

struct AuthInfo {
std::string user;
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/exchange_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ Status ExchangeNode::prepare(RuntimeState* state) {
Status ExchangeNode::open(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
ADD_THREAD_LOCAL_MEM_TRACKER(_stream_recvr->mem_tracker());
RETURN_IF_ERROR(ExecNode::open(state));
if (_is_merging) {
RETURN_IF_ERROR(_sort_exec_exprs.open(state));
Expand Down Expand Up @@ -215,7 +216,6 @@ Status ExchangeNode::get_next_merging(RuntimeState* state, RowBatch* output_batc
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(state->check_query_state("Exchange, while merging next."));

ADD_THREAD_LOCAL_MEM_TRACKER(_stream_recvr->mem_tracker());
RETURN_IF_ERROR(_stream_recvr->get_next(output_batch, eos));
while ((_num_rows_skipped < _offset)) {
_num_rows_skipped += output_batch->num_rows();
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ OlapScanner::OlapScanner(RuntimeState* runtime_state, OlapScanNode* parent, bool
_version(-1),
_mem_tracker(MemTracker::create_tracker(
tracker->limit(),
tracker->label() + ":OlapScanner:" + thread_local_ctx.get()->thread_id_str(),
tracker->label() + ":OlapScanner:" + tls_ctx()->thread_id_str(),
tracker)) {}

Status OlapScanner::prepare(
Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/tablet_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ NodeChannel::NodeChannel(OlapTableSink* parent, IndexChannel* index_channel, int
_tuple_data_buffer_ptr = &_tuple_data_buffer;
}
_node_channel_tracker =
MemTracker::create_tracker(-1, "NodeChannel" + thread_local_ctx.get()->thread_id_str());
MemTracker::create_tracker(-1, "NodeChannel" + tls_ctx()->thread_id_str());
}

NodeChannel::~NodeChannel() noexcept {
Expand Down Expand Up @@ -654,6 +654,7 @@ void IndexChannel::add_row(BlockRow& block_row, int64_t tablet_id) {

void IndexChannel::mark_as_failed(int64_t node_id, const std::string& host, const std::string& err,
int64_t tablet_id) {
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_index_channel_tracker);
const auto& it = _tablets_by_channel.find(node_id);
if (it == _tablets_by_channel.end()) {
return;
Expand Down
4 changes: 3 additions & 1 deletion be/src/exec/tablet_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "exec/tablet_info.h"
#include "gen_cpp/Types_types.h"
#include "gen_cpp/internal_service.pb.h"
#include "runtime/thread_context.h"
#include "util/bitmap.h"
#include "util/countdown_latch.h"
#include "util/ref_count_closure.h"
Expand Down Expand Up @@ -325,6 +326,7 @@ class IndexChannel {

void for_each_node_channel(
const std::function<void(const std::shared_ptr<NodeChannel>&)>& func) {
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_index_channel_tracker);
for (auto& it : _node_channels) {
func(it.second);
}
Expand Down Expand Up @@ -365,7 +367,7 @@ class IndexChannel {
std::unordered_map<int64_t, std::string> _failed_channels_msgs;
Status _intolerable_failure_status = Status::OK();

std::shared_ptr<MemTracker> _index_channel_tracker; // TODO(zxy) use after
std::shared_ptr<MemTracker> _index_channel_tracker;
};

// Write data to Olap Table.
Expand Down
11 changes: 10 additions & 1 deletion be/src/olap/byte_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <sys/mman.h>

#include "olap/utils.h"
#include "runtime/thread_context.h"

namespace doris {

Expand All @@ -42,6 +43,8 @@ void StorageByteBuffer::BufDeleter::operator()(char* p) {
if (0 != munmap(p, _mmap_length)) {
LOG(FATAL) << "fail to munmap: mem=" << p << ", len=" << _mmap_length
<< ", errno=" << Errno::no() << ", errno_str=" << Errno::str();
} else {
RELEASE_THREAD_LOCAL_MEM_TRACKER(_mmap_length);
}
} else {
delete[] p;
Expand Down Expand Up @@ -93,10 +96,12 @@ StorageByteBuffer* StorageByteBuffer::reference_buffer(StorageByteBuffer* refere

StorageByteBuffer* StorageByteBuffer::mmap(void* start, uint64_t length, int prot, int flags,
int fd, uint64_t offset) {
CONSUME_THREAD_LOCAL_MEM_TRACKER(length);
char* memory = (char*)::mmap(start, length, prot, flags, fd, offset);

if (MAP_FAILED == memory) {
OLAP_LOG_WARNING("fail to mmap. [errno='%d' errno_str='%s']", Errno::no(), Errno::str());
RELEASE_THREAD_LOCAL_MEM_TRACKER(length);
return nullptr;
}

Expand All @@ -108,6 +113,7 @@ StorageByteBuffer* StorageByteBuffer::mmap(void* start, uint64_t length, int pro
if (nullptr == buf) {
deleter(memory);
OLAP_LOG_WARNING("fail to allocate StorageByteBuffer.");
RELEASE_THREAD_LOCAL_MEM_TRACKER(length);
return nullptr;
}

Expand All @@ -128,10 +134,12 @@ StorageByteBuffer* StorageByteBuffer::mmap(FileHandler* handler, uint64_t offset

size_t length = handler->length();
int fd = handler->fd();
CONSUME_THREAD_LOCAL_MEM_TRACKER(length);
char* memory = (char*)::mmap(nullptr, length, prot, flags, fd, offset);

if (MAP_FAILED == memory) {
OLAP_LOG_WARNING("fail to mmap. [errno='%d' errno_str='%s']", Errno::no(), Errno::str());
RELEASE_THREAD_LOCAL_MEM_TRACKER(length);
return nullptr;
}

Expand All @@ -143,6 +151,7 @@ StorageByteBuffer* StorageByteBuffer::mmap(FileHandler* handler, uint64_t offset
if (nullptr == buf) {
deleter(memory);
OLAP_LOG_WARNING("fail to allocate StorageByteBuffer.");
RELEASE_THREAD_LOCAL_MEM_TRACKER(length);
return nullptr;
}

Expand Down Expand Up @@ -173,7 +182,7 @@ Status StorageByteBuffer::put(uint64_t index, char src) {
}

Status StorageByteBuffer::put(const char* src, uint64_t src_size, uint64_t offset,
uint64_t length) {
uint64_t length) {
//没有足够的空间可以写
if (length > remaining()) {
return Status::OLAPInternalError(OLAP_ERR_BUFFER_OVERFLOW);
Expand Down
3 changes: 1 addition & 2 deletions be/src/olap/lru_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -475,8 +475,7 @@ Cache::Handle* ShardedLRUCache::insert(const CacheKey& key, void* value, size_t
CachePriority priority) {
// The memory of the parameter value should be recorded in the tls mem tracker,
// transfer the memory ownership of the value to ShardedLRUCache::_mem_tracker.
thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(_mem_tracker.get(),
charge);
tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(_mem_tracker.get(), charge);
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
const uint32_t hash = _hash_slice(key);
return _shards[_shard(hash)]->insert(key, hash, value, charge, deleter, priority);
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/out_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
namespace doris {

OutStreamFactory::OutStreamFactory(CompressKind compress_kind, uint32_t stream_buffer_size)
: _compress_kind(compress_kind), _stream_buffer_size(stream_buffer_size) {
: _stream_buffer_size(stream_buffer_size) {
switch (compress_kind) {
case COMPRESS_NONE:
_compressor = nullptr;
Expand Down
1 change: 0 additions & 1 deletion be/src/olap/out_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ class OutStreamFactory {

private:
std::map<StreamName, OutStream*> _streams; // All created streams
CompressKind _compress_kind;
Compressor _compressor;
uint32_t _stream_buffer_size;

Expand Down
3 changes: 1 addition & 2 deletions be/src/olap/rowset/column_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -488,8 +488,7 @@ void ByteColumnWriter::record_position() {

IntegerColumnWriter::IntegerColumnWriter(uint32_t column_id, uint32_t unique_column_id,
OutStreamFactory* stream_factory, bool is_singed)
: _column_id(column_id),
_unique_column_id(unique_column_id),
: _unique_column_id(unique_column_id),
_stream_factory(stream_factory),
_writer(nullptr),
_is_signed(is_singed) {}
Expand Down
1 change: 0 additions & 1 deletion be/src/olap/rowset/column_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ class IntegerColumnWriter {
Status flush() { return _writer->flush(); }

private:
uint32_t _column_id;
uint32_t _unique_column_id;
OutStreamFactory* _stream_factory;
RunLengthIntegerWriter* _writer;
Expand Down
5 changes: 2 additions & 3 deletions be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,9 @@ Segment::Segment(const FilePathDesc& path_desc, uint32_t segment_id,
const TabletSchema* tablet_schema)
: _path_desc(path_desc), _segment_id(segment_id), _tablet_schema(tablet_schema) {
#ifndef BE_TEST
_mem_tracker = MemTracker::create_virtual_tracker(
-1, "Segment", StorageEngine::instance()->tablet_mem_tracker());
_mem_tracker = StorageEngine::instance()->tablet_mem_tracker();
#else
_mem_tracker = MemTracker::create_virtual_tracker(-1, "Segment");
_mem_tracker = MemTracker::get_process_tracker();
#endif
}

Expand Down
6 changes: 6 additions & 0 deletions be/src/runtime/bufferpool/system_allocator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include "common/config.h"
#include "gutil/strings/substitute.h"
#include "runtime/thread_context.h"
#include "util/bit_util.h"
#include "util/error_util.h"

Expand Down Expand Up @@ -75,9 +76,11 @@ Status SystemAllocator::AllocateViaMMap(int64_t len, uint8_t** buffer_mem) {
// Map an extra huge page so we can fix up the alignment if needed.
map_len += HUGE_PAGE_SIZE;
}
CONSUME_THREAD_LOCAL_MEM_TRACKER(map_len);
uint8_t* mem = reinterpret_cast<uint8_t*>(
mmap(nullptr, map_len, PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0));
if (mem == MAP_FAILED) {
RELEASE_THREAD_LOCAL_MEM_TRACKER(map_len);
return Status::BufferAllocFailed("mmap failed");
}

Expand All @@ -89,10 +92,12 @@ Status SystemAllocator::AllocateViaMMap(int64_t len, uint8_t** buffer_mem) {
if (misalignment != 0) {
uintptr_t fixup = HUGE_PAGE_SIZE - misalignment;
munmap(mem, fixup);
RELEASE_THREAD_LOCAL_MEM_TRACKER(fixup);
mem += fixup;
map_len -= fixup;
}
munmap(mem + len, map_len - len);
RELEASE_THREAD_LOCAL_MEM_TRACKER(map_len - len);
DCHECK_EQ(reinterpret_cast<uintptr_t>(mem) % HUGE_PAGE_SIZE, 0) << mem;
// Mark the buffer as a candidate for promotion to huge pages. The Linux Transparent
// Huge Pages implementation will try to back the memory with a huge page if it is
Expand Down Expand Up @@ -142,6 +147,7 @@ Status SystemAllocator::AllocateViaMalloc(int64_t len, uint8_t** buffer_mem) {
void SystemAllocator::Free(BufferPool::BufferHandle&& buffer) {
if (config::mmap_buffers) {
int rc = munmap(buffer.data(), buffer.len());
RELEASE_THREAD_LOCAL_MEM_TRACKER(buffer.len());
DCHECK_EQ(rc, 0) << "Unexpected munmap() error: " << errno;
} else {
bool use_huge_pages = buffer.len() % HUGE_PAGE_SIZE == 0 && config::madvise_huge_pages;
Expand Down
8 changes: 4 additions & 4 deletions be/src/runtime/disk_io_mgr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ void DiskIoMgr::BufferDescriptor::reset(RequestContext* reader, ScanRange* range
_eosr = false;
_status = Status::OK();
// Consume in the tls mem tracker when the buffer is allocated.
_buffer_mem_tracker = thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker().get();
_buffer_mem_tracker = tls_ctx()->_thread_mem_tracker_mgr->mem_tracker().get();
}

void DiskIoMgr::BufferDescriptor::return_buffer() {
Expand Down Expand Up @@ -739,7 +739,7 @@ char* DiskIoMgr::get_free_buffer(int64_t* buffer_size) {
buffer = new char[*buffer_size];
} else {
// This means the buffer's memory ownership is transferred from DiskIoMgr to tls tracker.
_mem_tracker->transfer_to(thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker().get(), *buffer_size);
_mem_tracker->transfer_to(tls_ctx()->_thread_mem_tracker_mgr->mem_tracker().get(), *buffer_size);
buffer = _free_buffers[idx].front();
_free_buffers[idx].pop_front();
}
Expand Down Expand Up @@ -767,7 +767,7 @@ void DiskIoMgr::gc_io_buffers(int64_t bytes_to_free) {
// The deleted buffer is released in the tls mem tracker, the deleted buffer belongs to DiskIoMgr,
// so the freed memory should be recorded in the DiskIoMgr mem tracker. So if the tls mem tracker
// and the DiskIoMgr tracker are different, transfer memory ownership.
_mem_tracker->transfer_to(thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker().get(), bytes_freed);
_mem_tracker->transfer_to(tls_ctx()->_thread_mem_tracker_mgr->mem_tracker().get(), bytes_freed);
}

void DiskIoMgr::return_free_buffer(BufferDescriptor* desc) {
Expand All @@ -793,7 +793,7 @@ void DiskIoMgr::return_free_buffer(char* buffer, int64_t buffer_size, MemTracker
// The deleted buffer is released in the tls mem tracker. When the buffer was allocated,
// it was consumed in BufferDescriptor->buffer_mem_tracker, so if the tls mem tracker and
// the tracker in the parameters are different, transfer memory ownership.
tracker->transfer_to(thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker().get(), buffer_size);
tracker->transfer_to(tls_ctx()->_thread_mem_tracker_mgr->mem_tracker().get(), buffer_size);
}
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/fold_constant_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ TUniqueId FoldConstantExecutor::_dummy_id;

Status FoldConstantExecutor::fold_constant_expr(
const TFoldConstantParams& params, PConstantExprResult* response) {
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
const auto& expr_map = params.expr_map;
auto expr_result_map = response->mutable_expr_result_map();

Expand All @@ -53,7 +54,6 @@ Status FoldConstantExecutor::fold_constant_expr(
if (UNLIKELY(!status.ok())) {
return status;
}
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);

for (const auto& m : expr_map) {
PExprResultMap pexpr_result_map;
Expand Down
3 changes: 1 addition & 2 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -467,8 +467,7 @@ void FragmentMgr::_exec_actual(std::shared_ptr<FragmentExecState> exec_state, Fi
.instance_id(exec_state->fragment_instance_id())
.tag("pthread_id", std::to_string((uintptr_t)pthread_self()));
#ifndef BE_TEST
SCOPED_ATTACH_TASK_THREAD(exec_state->executor()->runtime_state()->query_type(),
print_id(exec_state->query_id()), exec_state->fragment_instance_id(),
SCOPED_ATTACH_TASK_THREAD(exec_state->executor()->runtime_state(),
exec_state->executor()->runtime_state()->instance_mem_tracker());
#endif
exec_state->execute();
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/mem_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ MemPool::MemPool()
total_allocated_bytes_(0),
total_reserved_bytes_(0),
peak_allocated_bytes_(0),
_mem_tracker(thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker().get()) {}
_mem_tracker(tls_ctx()->_thread_mem_tracker_mgr->mem_tracker().get()) {}

MemPool::ChunkInfo::ChunkInfo(const Chunk& chunk_) : chunk(chunk_), allocated_bytes(0) {
DorisMetrics::instance()->memory_pool_bytes_total->increment(chunk.size);
Expand Down
20 changes: 18 additions & 2 deletions be/src/runtime/mem_tracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,19 @@ MemTracker* MemTracker::get_raw_process_tracker() {
return raw_process_tracker;
}

// Track memory for all brpc server responses.
static std::shared_ptr<MemTracker> brpc_server_tracker;
static GoogleOnceType brpc_server_tracker_once = GOOGLE_ONCE_INIT;

void MemTracker::create_brpc_server_tracker() {
brpc_server_tracker = MemTracker::create_tracker(-1, "Brpc", get_process_tracker(), MemTrackerLevel::OVERVIEW);
}

std::shared_ptr<MemTracker> MemTracker::get_brpc_server_tracker() {
GoogleOnceInit(&brpc_server_tracker_once, &MemTracker::create_brpc_server_tracker);
return brpc_server_tracker;
}

void MemTracker::list_process_trackers(std::vector<std::shared_ptr<MemTracker>>* trackers) {
trackers->clear();
std::deque<std::shared_ptr<MemTracker>> to_process;
Expand Down Expand Up @@ -88,7 +101,8 @@ std::shared_ptr<MemTracker> MemTracker::create_tracker(int64_t byte_limit, const
const std::shared_ptr<MemTracker>& parent,
MemTrackerLevel level,
RuntimeProfile* profile) {
std::shared_ptr<MemTracker> reset_parent = parent ? parent : thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker();
std::shared_ptr<MemTracker> reset_parent =
parent ? parent : tls_ctx()->_thread_mem_tracker_mgr->mem_tracker();
DCHECK(reset_parent);

std::shared_ptr<MemTracker> tracker(
Expand All @@ -102,7 +116,8 @@ std::shared_ptr<MemTracker> MemTracker::create_tracker(int64_t byte_limit, const
std::shared_ptr<MemTracker> MemTracker::create_virtual_tracker(
int64_t byte_limit, const std::string& label, const std::shared_ptr<MemTracker>& parent,
MemTrackerLevel level) {
std::shared_ptr<MemTracker> reset_parent = parent ? parent : thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker();
std::shared_ptr<MemTracker> reset_parent =
parent ? parent : tls_ctx()->_thread_mem_tracker_mgr->mem_tracker();
DCHECK(reset_parent);

std::shared_ptr<MemTracker> tracker(
Expand All @@ -121,6 +136,7 @@ MemTracker::MemTracker(int64_t byte_limit, const std::string& label,
RuntimeProfile* profile)
: _limit(byte_limit),
_label(label),
// Not 100% sure the id is unique. This is generated because it is faster than converting to int after hash.
_id((GetCurrentTimeMicros() % 1000000) * 100 + _label.length()),
_parent(parent),
_level(level) {
Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/mem_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ class MemTracker {
// Gets a shared_ptr to the "process" tracker, creating it if necessary.
static std::shared_ptr<MemTracker> get_process_tracker();
static MemTracker* get_raw_process_tracker();
// Gets a shared_ptr to the "brpc server" tracker, creating it if necessary.
static std::shared_ptr<MemTracker> get_brpc_server_tracker();

Status check_sys_mem_info(int64_t bytes) {
if (MemInfo::initialized() && MemInfo::current_mem() + bytes >= MemInfo::mem_limit()) {
Expand Down Expand Up @@ -464,6 +466,8 @@ class MemTracker {

// Creates the process tracker.
static void create_process_tracker();
// Creates the brpc server tracker.
static void create_brpc_server_tracker();

// Limit on memory consumption, in bytes. If limit_ == -1, there is no consumption limit.
int64_t _limit;
Expand Down
Loading