Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@
#include "runtime/frontend_info.h" // TODO(zhiqiang): find a way to remove this include header
#include "util/threadpool.h"

namespace orc {
class MemoryPool;
}
namespace arrow {
class MemoryPool;
}

namespace doris {
namespace vectorized {
class VDataStreamMgr;
Expand Down Expand Up @@ -305,6 +312,9 @@ class ExecEnv {
segment_v2::TmpFileDirs* get_tmp_file_dirs() { return _tmp_file_dirs.get(); }
io::FDCache* file_cache_open_fd_cache() const { return _file_cache_open_fd_cache.get(); }

orc::MemoryPool* orc_memory_pool() { return _orc_memory_pool; }
arrow::MemoryPool* arrow_memory_pool() { return _arrow_memory_pool; }

private:
ExecEnv();

Expand Down Expand Up @@ -435,6 +445,9 @@ class ExecEnv {
std::unique_ptr<pipeline::PipelineTracerContext> _pipeline_tracer_ctx;
std::unique_ptr<segment_v2::TmpFileDirs> _tmp_file_dirs;
doris::vectorized::SpillStreamManager* _spill_stream_mgr = nullptr;

orc::MemoryPool* _orc_memory_pool = nullptr;
arrow::MemoryPool* _arrow_memory_pool = nullptr;
};

template <>
Expand Down
9 changes: 9 additions & 0 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@
#include "util/threadpool.h"
#include "util/thrift_rpc_helper.h"
#include "util/timezone_utils.h"
#include "vec/exec/format/orc/orc_memory_pool.h"
#include "vec/exec/format/parquet/arrow_memory_pool.h"
#include "vec/exec/scan/scanner_scheduler.h"
#include "vec/runtime/vdata_stream_mgr.h"
#include "vec/sink/delta_writer_v2_pool.h"
Expand Down Expand Up @@ -573,6 +575,10 @@ Status ExecEnv::_init_mem_env() {
<< PrettyPrinter::print(inverted_index_cache_limit, TUnit::BYTES)
<< ", origin config value: " << config::inverted_index_query_cache_limit;

// init orc memory pool
_orc_memory_pool = new doris::vectorized::ORCMemoryPool();
_arrow_memory_pool = new doris::vectorized::ArrowMemoryPool();

return Status::OK();
}

Expand Down Expand Up @@ -751,6 +757,9 @@ void ExecEnv::destroy() {
// We should free task scheduler finally because task queue / scheduler maybe used by pipelineX.
SAFE_DELETE(_without_group_task_scheduler);

SAFE_DELETE(_arrow_memory_pool);
SAFE_DELETE(_orc_memory_pool);

// dns cache is a global instance and need to be released at last
SAFE_DELETE(_dns_cache);

Expand Down
2 changes: 1 addition & 1 deletion be/src/util/faststring.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ namespace doris {
// common use cases (in particular, resize() will fill with uninitialized data
// instead of memsetting to \0)
// only build() can transfer data to the outside.
class faststring : private Allocator<false, false, false> {
class faststring : private Allocator<false, false, false, DefaultMemoryAllocator> {
public:
enum { kInitialCapacity = 32 };

Expand Down
2 changes: 1 addition & 1 deletion be/src/util/slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ struct SliceMap {
//
// only receive the memory allocated by Allocator and disables mmap,
// otherwise the memory may not be freed correctly, currently only be constructed by faststring.
class OwnedSlice : private Allocator<false, false, false> {
class OwnedSlice : private Allocator<false, false, false, DefaultMemoryAllocator> {
public:
OwnedSlice() : _slice((uint8_t*)nullptr, 0) {}

Expand Down
90 changes: 59 additions & 31 deletions be/src/vec/common/allocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,12 @@
#include "util/stack_util.h"
#include "util/uid_util.h"

template <bool clear_memory_, bool mmap_populate, bool use_mmap>
void Allocator<clear_memory_, mmap_populate, use_mmap>::sys_memory_check(size_t size) const {
std::unordered_map<void*, size_t> RecordSizeMemoryAllocator::_allocated_sizes;
std::mutex RecordSizeMemoryAllocator::_mutex;

template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator>
void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::sys_memory_check(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function 'sys_memory_check' exceeds recommended size/complexity thresholds [readability-function-size]

void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::sys_memory_check(
                                                                         ^
Additional context

be/src/vec/common/allocator.cpp:46: 112 lines including whitespace and comments (threshold 80)

void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::sys_memory_check(
                                                                         ^

size_t size) const {
#ifdef BE_TEST
if (!doris::ExecEnv::ready()) {
return;
Expand Down Expand Up @@ -155,8 +159,9 @@ void Allocator<clear_memory_, mmap_populate, use_mmap>::sys_memory_check(size_t
}
}

template <bool clear_memory_, bool mmap_populate, bool use_mmap>
void Allocator<clear_memory_, mmap_populate, use_mmap>::memory_tracker_check(size_t size) const {
template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator>
void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::memory_tracker_check(
size_t size) const {
#ifdef BE_TEST
if (!doris::ExecEnv::ready()) {
return;
Expand Down Expand Up @@ -191,24 +196,27 @@ void Allocator<clear_memory_, mmap_populate, use_mmap>::memory_tracker_check(siz
}
}

template <bool clear_memory_, bool mmap_populate, bool use_mmap>
void Allocator<clear_memory_, mmap_populate, use_mmap>::memory_check(size_t size) const {
template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator>
void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::memory_check(
size_t size) const {
sys_memory_check(size);
memory_tracker_check(size);
}

template <bool clear_memory_, bool mmap_populate, bool use_mmap>
void Allocator<clear_memory_, mmap_populate, use_mmap>::consume_memory(size_t size) const {
template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator>
void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::consume_memory(
size_t size) const {
CONSUME_THREAD_MEM_TRACKER(size);
}

template <bool clear_memory_, bool mmap_populate, bool use_mmap>
void Allocator<clear_memory_, mmap_populate, use_mmap>::release_memory(size_t size) const {
template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator>
void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::release_memory(
size_t size) const {
RELEASE_THREAD_MEM_TRACKER(size);
}

template <bool clear_memory_, bool mmap_populate, bool use_mmap>
void Allocator<clear_memory_, mmap_populate, use_mmap>::throw_bad_alloc(
template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator>
void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::throw_bad_alloc(
const std::string& err) const {
LOG(WARNING) << err
<< fmt::format("{}, Stacktrace: {}",
Expand All @@ -219,9 +227,9 @@ void Allocator<clear_memory_, mmap_populate, use_mmap>::throw_bad_alloc(
}

#ifndef NDEBUG
template <bool clear_memory_, bool mmap_populate, bool use_mmap>
void Allocator<clear_memory_, mmap_populate, use_mmap>::add_address_sanitizers(void* buf,
size_t size) const {
template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator>
void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::add_address_sanitizers(
void* buf, size_t size) const {
#ifdef BE_TEST
if (!doris::ExecEnv::ready()) {
return;
Expand All @@ -230,8 +238,8 @@ void Allocator<clear_memory_, mmap_populate, use_mmap>::add_address_sanitizers(v
doris::thread_context()->thread_mem_tracker()->add_address_sanitizers(buf, size);
}

template <bool clear_memory_, bool mmap_populate, bool use_mmap>
void Allocator<clear_memory_, mmap_populate, use_mmap>::remove_address_sanitizers(
template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator>
void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::remove_address_sanitizers(
void* buf, size_t size) const {
#ifdef BE_TEST
if (!doris::ExecEnv::ready()) {
Expand All @@ -242,23 +250,43 @@ void Allocator<clear_memory_, mmap_populate, use_mmap>::remove_address_sanitizer
}
#endif

template <bool clear_memory_, bool mmap_populate, bool use_mmap>
void* Allocator<clear_memory_, mmap_populate, use_mmap>::alloc(size_t size, size_t alignment) {
template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator>
void* Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::alloc(size_t size,
size_t alignment) {
return alloc_impl(size, alignment);
}

template <bool clear_memory_, bool mmap_populate, bool use_mmap>
void* Allocator<clear_memory_, mmap_populate, use_mmap>::realloc(void* buf, size_t old_size,
size_t new_size,
size_t alignment) {
template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator>
void* Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::realloc(
void* buf, size_t old_size, size_t new_size, size_t alignment) {
return realloc_impl(buf, old_size, new_size, alignment);
}

template class Allocator<true, true, true>;
template class Allocator<true, true, false>;
template class Allocator<true, false, true>;
template class Allocator<true, false, false>;
template class Allocator<false, true, true>;
template class Allocator<false, true, false>;
template class Allocator<false, false, true>;
template class Allocator<false, false, false>;
template class Allocator<true, true, true, DefaultMemoryAllocator>;
template class Allocator<true, true, false, DefaultMemoryAllocator>;
template class Allocator<true, false, true, DefaultMemoryAllocator>;
template class Allocator<true, false, false, DefaultMemoryAllocator>;
template class Allocator<false, true, true, DefaultMemoryAllocator>;
template class Allocator<false, true, false, DefaultMemoryAllocator>;
template class Allocator<false, false, true, DefaultMemoryAllocator>;
template class Allocator<false, false, false, DefaultMemoryAllocator>;

/** It would be better to put these Memory Allocators where they are used, such as in the orc memory pool and arrow memory pool.
* But currently allocators use templates in .cpp instead of all in .h, so they can only be placed here.
*/
template class Allocator<true, true, false, ORCMemoryAllocator>;
template class Allocator<true, false, true, ORCMemoryAllocator>;
template class Allocator<true, false, false, ORCMemoryAllocator>;
template class Allocator<false, true, true, ORCMemoryAllocator>;
template class Allocator<false, true, false, ORCMemoryAllocator>;
template class Allocator<false, false, true, ORCMemoryAllocator>;
template class Allocator<false, false, false, ORCMemoryAllocator>;

template class Allocator<true, true, true, RecordSizeMemoryAllocator>;
template class Allocator<true, true, false, RecordSizeMemoryAllocator>;
template class Allocator<true, false, true, RecordSizeMemoryAllocator>;
template class Allocator<true, false, false, RecordSizeMemoryAllocator>;
template class Allocator<false, true, true, RecordSizeMemoryAllocator>;
template class Allocator<false, true, false, RecordSizeMemoryAllocator>;
template class Allocator<false, false, true, RecordSizeMemoryAllocator>;
template class Allocator<false, false, false, RecordSizeMemoryAllocator>;
Loading