From c28752d51f30aa702ae35f30d99b1c26ee025561 Mon Sep 17 00:00:00 2001 From: kaka11chen Date: Wed, 3 Jul 2024 19:58:44 +0800 Subject: [PATCH 1/5] [Feature](multi-catalog) Add memory tracker for orc reader and writer. --- be/src/runtime/exec_env.h | 14 ++++++ be/src/runtime/exec_env_init.cpp | 10 ++++ be/src/vec/exec/format/orc/orc_memory_pool.h | 53 ++++++++++++++++++++ be/src/vec/exec/format/orc/vorc_reader.cpp | 5 ++ be/src/vec/runtime/vorc_transformer.cpp | 17 +++++-- be/src/vec/runtime/vorc_transformer.h | 2 +- 6 files changed, 97 insertions(+), 4 deletions(-) create mode 100644 be/src/vec/exec/format/orc/orc_memory_pool.h diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 41d8c74032649e..c916529d41e616 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -35,6 +35,10 @@ #include "runtime/frontend_info.h" // TODO(zhiqiang): find a way to remove this include header #include "util/threadpool.h" +namespace orc { +class MemoryPool; +} + namespace doris { namespace vectorized { class VDataStreamMgr; @@ -186,6 +190,9 @@ class ExecEnv { } std::shared_ptr s3_file_buffer_tracker() { return _s3_file_buffer_tracker; } + std::shared_ptr orc_reader_tracker() { return _orc_reader_tracker; } + std::shared_ptr orc_writer_tracker() { return _orc_writer_tracker; } + ThreadPool* send_batch_thread_pool() { return _send_batch_thread_pool.get(); } ThreadPool* buffered_reader_prefetch_thread_pool() { return _buffered_reader_prefetch_thread_pool.get(); @@ -305,6 +312,8 @@ class ExecEnv { segment_v2::TmpFileDirs* get_tmp_file_dirs() { return _tmp_file_dirs.get(); } + orc::MemoryPool* orc_memory_pool() { return _orc_memory_pool; } + private: ExecEnv(); @@ -353,6 +362,9 @@ class ExecEnv { std::shared_ptr _subcolumns_tree_tracker; std::shared_ptr _s3_file_buffer_tracker; + std::shared_ptr _orc_reader_tracker; + std::shared_ptr _orc_writer_tracker; + std::unique_ptr _send_batch_thread_pool; // Threadpool used to prefetch remote file for buffered reader std::unique_ptr _buffered_reader_prefetch_thread_pool; @@ -433,6 +445,8 @@ class ExecEnv { std::unique_ptr _pipeline_tracer_ctx; std::unique_ptr _tmp_file_dirs; doris::vectorized::SpillStreamManager* _spill_stream_mgr = nullptr; + + orc::MemoryPool* _orc_memory_pool = nullptr; }; template <> diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index d9e21e1603ba2a..00efefdca31d9b 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -93,6 +93,7 @@ #include "util/threadpool.h" #include "util/thrift_rpc_helper.h" #include "util/timezone_utils.h" +#include "vec/exec/format/orc/orc_memory_pool.h" #include "vec/exec/scan/scanner_scheduler.h" #include "vec/runtime/vdata_stream_mgr.h" #include "vec/sink/delta_writer_v2_pool.h" @@ -503,6 +504,9 @@ Status ExecEnv::_init_mem_env() { RETURN_IF_ERROR(_block_spill_mgr->init()); + // init orc memory pool + _orc_memory_pool = new doris::vectorized::ORCMemoryPool; + return Status::OK(); } @@ -527,6 +531,10 @@ void ExecEnv::init_mem_tracker() { MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "SubcolumnsTree"); _s3_file_buffer_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "S3FileBuffer"); + _orc_reader_tracker = + MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "ORCReader"); + _orc_writer_tracker = + MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "ORCWriter"); } void ExecEnv::_register_metrics() { @@ -673,6 +681,8 @@ void ExecEnv::destroy() { // We should free task scheduler finally because task queue / scheduler maybe used by pipelineX. SAFE_DELETE(_without_group_task_scheduler); + SAFE_DELETE(_orc_memory_pool); + // dns cache is a global instance and need to be released at last SAFE_DELETE(_dns_cache); diff --git a/be/src/vec/exec/format/orc/orc_memory_pool.h b/be/src/vec/exec/format/orc/orc_memory_pool.h new file mode 100644 index 00000000000000..0d10edef9c45d9 --- /dev/null +++ b/be/src/vec/exec/format/orc/orc_memory_pool.h @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "orc/MemoryPool.hh" + +#if defined(USE_JEMALLOC) && defined(USE_MEM_TRACKER) +extern "C" { +void* doris_malloc(size_t size) __THROW; +void doris_free(void* p) __THROW; +} +#endif // #if defined(USE_JEMALLOC) && defined(USE_MEM_TRACKER) + +namespace doris::vectorized { + +class ORCMemoryPool : public orc::MemoryPool { +public: + char* malloc(uint64_t size) override { +#if defined(USE_JEMALLOC) && defined(USE_MEM_TRACKER) + return reinterpret_cast(doris_malloc(size)); +#else + return reinterpret_cast(std::malloc(size)); +#endif // #if defined(USE_JEMALLOC) && defined(USE_MEM_TRACKER) + } + + void free(char* p) override { +#if defined(USE_JEMALLOC) && defined(USE_MEM_TRACKER) + doris_free(p); +#else + std::free(p); +#endif // #if defined(USE_JEMALLOC) && defined(USE_MEM_TRACKER) + } + + ORCMemoryPool() = default; + ~ORCMemoryPool() override = default; +}; + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index 16909f0023ae11..ff76947625430d 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -70,6 +70,7 @@ #include "vec/data_types/data_type_map.h" #include "vec/data_types/data_type_nullable.h" #include "vec/data_types/data_type_struct.h" +#include "vec/exec/format/orc/orc_memory_pool.h" #include "vec/exec/format/table/transactional_hive_common.h" #include "vec/exprs/vbloom_predicate.h" #include "vec/exprs/vdirect_in_predicate.h" @@ -177,6 +178,7 @@ OrcReader::OrcReader(const TFileScanRangeParams& params, const TFileRangeDesc& r } OrcReader::~OrcReader() { + SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(ExecEnv::GetInstance()->orc_reader_tracker()); if (_obj_pool && _obj_pool.get()) { _obj_pool->clear(); } @@ -252,6 +254,7 @@ Status OrcReader::_create_file_reader() { // create orc reader try { orc::ReaderOptions options; + options.setMemoryPool(*ExecEnv::GetInstance()->orc_memory_pool()); _reader = orc::createReader( std::unique_ptr(_file_input_stream.release()), options); } catch (std::exception& e) { @@ -276,6 +279,7 @@ Status OrcReader::init_reader( const RowDescriptor* row_descriptor, const VExprContextSPtrs* not_single_slot_filter_conjuncts, const std::unordered_map* slot_id_to_filter_conjuncts) { + SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(ExecEnv::GetInstance()->orc_reader_tracker()); _column_names = column_names; _colname_to_value_range = colname_to_value_range; _lazy_read_ctx.conjuncts = conjuncts; @@ -1539,6 +1543,7 @@ std::string OrcReader::get_field_name_lower_case(const orc::Type* orc_type, int } Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { + SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(ExecEnv::GetInstance()->orc_reader_tracker()); RETURN_IF_ERROR(get_next_block_impl(block, read_rows, eof)); if (_orc_filter) { RETURN_IF_ERROR(_orc_filter->get_status()); diff --git a/be/src/vec/runtime/vorc_transformer.cpp b/be/src/vec/runtime/vorc_transformer.cpp index 09bae276d65d18..b3761ca6fc4be1 100644 --- a/be/src/vec/runtime/vorc_transformer.cpp +++ b/be/src/vec/runtime/vorc_transformer.cpp @@ -31,7 +31,9 @@ #include "orc/OrcFile.hh" #include "orc/Vector.hh" #include "runtime/define_primitive_type.h" +#include "runtime/exec_env.h" #include "runtime/runtime_state.h" +#include "runtime/thread_context.h" #include "runtime/types.h" #include "util/binary_cast.hpp" #include "util/debug_util.h" @@ -122,7 +124,12 @@ VOrcTransformer::VOrcTransformer(RuntimeState* state, doris::io::FileWriter* fil set_compression_type(compress_type); } +VOrcTransformer::~VOrcTransformer() { + SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(ExecEnv::GetInstance()->orc_writer_tracker()); +} + Status VOrcTransformer::open() { + SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(ExecEnv::GetInstance()->orc_writer_tracker()); if (!_schema_str.empty()) { try { _schema = orc::Type::buildTypeFromString(_schema_str); @@ -151,6 +158,7 @@ Status VOrcTransformer::open() { _output_stream = std::make_unique(_file_writer); try { + _write_options->setMemoryPool(ExecEnv::GetInstance()->orc_memory_pool()); _writer = orc::createWriter(*_schema, _output_stream.get(), *_write_options); } catch (const std::exception& e) { return Status::InternalError("failed to create writer: {}", e.what()); @@ -314,6 +322,7 @@ int64_t VOrcTransformer::written_len() { } Status VOrcTransformer::close() { + SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(ExecEnv::GetInstance()->orc_writer_tracker()); if (_writer != nullptr) { try { _writer->close(); @@ -332,6 +341,8 @@ Status VOrcTransformer::write(const Block& block) { return Status::OK(); } + SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(ExecEnv::GetInstance()->orc_writer_tracker()); + // Buffer used by date/datetime/datev2/datetimev2/largeint type std::vector buffer_list; Defer defer {[&]() { @@ -353,13 +364,13 @@ Status VOrcTransformer::write(const Block& block) { RETURN_IF_ERROR(_serdes[i]->write_column_to_orc( _state->timezone(), *raw_column, nullptr, root->fields[i], 0, sz, buffer_list)); } + root->numElements = sz; + _writer->add(*row_batch); + _cur_written_rows += sz; } catch (const std::exception& e) { LOG(WARNING) << "Orc write error: " << e.what(); return Status::InternalError(e.what()); } - root->numElements = sz; - _writer->add(*row_batch); - _cur_written_rows += sz; return Status::OK(); } diff --git a/be/src/vec/runtime/vorc_transformer.h b/be/src/vec/runtime/vorc_transformer.h index 134e949e76cabd..dff98ac8903b5f 100644 --- a/be/src/vec/runtime/vorc_transformer.h +++ b/be/src/vec/runtime/vorc_transformer.h @@ -84,7 +84,7 @@ class VOrcTransformer final : public VFileFormatTransformer { TFileCompressType::type compression, const iceberg::Schema* iceberg_schema = nullptr); - ~VOrcTransformer() = default; + ~VOrcTransformer(); Status open() override; From 82744a4fc4dd4097a3cae4ec0d9f4b1cd2565de8 Mon Sep 17 00:00:00 2001 From: kaka11chen Date: Tue, 9 Jul 2024 15:12:06 +0800 Subject: [PATCH 2/5] update --- be/src/runtime/exec_env.h | 11 +- be/src/runtime/exec_env_init.cpp | 9 +- be/src/util/faststring.h | 2 +- be/src/util/slice.h | 2 +- be/src/vec/common/allocator.cpp | 85 +++++--- be/src/vec/common/allocator.h | 193 ++++++++++++++++- be/src/vec/common/allocator_fwd.h | 4 +- be/src/vec/common/hash_table/phmap_fwd_decl.h | 2 +- be/src/vec/exec/format/orc/orc_memory_pool.h | 32 +-- be/src/vec/exec/format/orc/vorc_reader.cpp | 3 - .../exec/format/parquet/arrow_memory_pool.cpp | 74 +++++++ .../exec/format/parquet/arrow_memory_pool.h | 204 ++++++++++++++++++ be/src/vec/runtime/vorc_transformer.cpp | 23 +- be/src/vec/runtime/vorc_transformer.h | 2 +- be/src/vec/runtime/vparquet_transformer.cpp | 15 +- 15 files changed, 570 insertions(+), 91 deletions(-) create mode 100644 be/src/vec/exec/format/parquet/arrow_memory_pool.cpp create mode 100644 be/src/vec/exec/format/parquet/arrow_memory_pool.h diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index c916529d41e616..d8f7427c1f667b 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -38,6 +38,9 @@ namespace orc { class MemoryPool; } +namespace arrow { +class MemoryPool; +} namespace doris { namespace vectorized { @@ -190,9 +193,6 @@ class ExecEnv { } std::shared_ptr s3_file_buffer_tracker() { return _s3_file_buffer_tracker; } - std::shared_ptr orc_reader_tracker() { return _orc_reader_tracker; } - std::shared_ptr orc_writer_tracker() { return _orc_writer_tracker; } - ThreadPool* send_batch_thread_pool() { return _send_batch_thread_pool.get(); } ThreadPool* buffered_reader_prefetch_thread_pool() { return _buffered_reader_prefetch_thread_pool.get(); @@ -313,6 +313,7 @@ class ExecEnv { segment_v2::TmpFileDirs* get_tmp_file_dirs() { return _tmp_file_dirs.get(); } orc::MemoryPool* orc_memory_pool() { return _orc_memory_pool; } + arrow::MemoryPool* arrow_memory_pool() { return _arrow_memory_pool; } private: ExecEnv(); @@ -362,9 +363,6 @@ class ExecEnv { std::shared_ptr _subcolumns_tree_tracker; std::shared_ptr _s3_file_buffer_tracker; - std::shared_ptr _orc_reader_tracker; - std::shared_ptr _orc_writer_tracker; - std::unique_ptr _send_batch_thread_pool; // Threadpool used to prefetch remote file for buffered reader std::unique_ptr _buffered_reader_prefetch_thread_pool; @@ -447,6 +445,7 @@ class ExecEnv { doris::vectorized::SpillStreamManager* _spill_stream_mgr = nullptr; orc::MemoryPool* _orc_memory_pool = nullptr; + arrow::MemoryPool* _arrow_memory_pool = nullptr; }; template <> diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 00efefdca31d9b..c75d7b3a8fc09b 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -94,6 +94,7 @@ #include "util/thrift_rpc_helper.h" #include "util/timezone_utils.h" #include "vec/exec/format/orc/orc_memory_pool.h" +#include "vec/exec/format/parquet/arrow_memory_pool.h" #include "vec/exec/scan/scanner_scheduler.h" #include "vec/runtime/vdata_stream_mgr.h" #include "vec/sink/delta_writer_v2_pool.h" @@ -505,7 +506,8 @@ Status ExecEnv::_init_mem_env() { RETURN_IF_ERROR(_block_spill_mgr->init()); // init orc memory pool - _orc_memory_pool = new doris::vectorized::ORCMemoryPool; + _orc_memory_pool = new doris::vectorized::ORCMemoryPool(); + _arrow_memory_pool = new doris::vectorized::ArrowMemoryPool(); return Status::OK(); } @@ -531,10 +533,6 @@ void ExecEnv::init_mem_tracker() { MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "SubcolumnsTree"); _s3_file_buffer_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "S3FileBuffer"); - _orc_reader_tracker = - MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "ORCReader"); - _orc_writer_tracker = - MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "ORCWriter"); } void ExecEnv::_register_metrics() { @@ -681,6 +679,7 @@ void ExecEnv::destroy() { // We should free task scheduler finally because task queue / scheduler maybe used by pipelineX. SAFE_DELETE(_without_group_task_scheduler); + SAFE_DELETE(_arrow_memory_pool); SAFE_DELETE(_orc_memory_pool); // dns cache is a global instance and need to be released at last diff --git a/be/src/util/faststring.h b/be/src/util/faststring.h index 4edcce9836df0e..9308a4d20bbb1c 100644 --- a/be/src/util/faststring.h +++ b/be/src/util/faststring.h @@ -35,7 +35,7 @@ namespace doris { // common use cases (in particular, resize() will fill with uninitialized data // instead of memsetting to \0) // only build() can transfer data to the outside. -class faststring : private Allocator { +class faststring : private Allocator { public: enum { kInitialCapacity = 32 }; diff --git a/be/src/util/slice.h b/be/src/util/slice.h index 80f9616f3da2bd..bae33d4ee75010 100644 --- a/be/src/util/slice.h +++ b/be/src/util/slice.h @@ -340,7 +340,7 @@ struct SliceMap { // // only receive the memory allocated by Allocator and disables mmap, // otherwise the memory may not be freed correctly, currently only be constructed by faststring. -class OwnedSlice : private Allocator { +class OwnedSlice : private Allocator { public: OwnedSlice() : _slice((uint8_t*)nullptr, 0) {} diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp index 35418dea08bc89..c7d9a30d1d47d0 100644 --- a/be/src/vec/common/allocator.cpp +++ b/be/src/vec/common/allocator.cpp @@ -37,8 +37,12 @@ #include "util/stack_util.h" #include "util/uid_util.h" -template -void Allocator::sys_memory_check(size_t size) const { +std::unordered_map RecordSizeMemoryAllocator::_allocated_sizes; +std::mutex RecordSizeMemoryAllocator::_mutex; + +template +void Allocator::sys_memory_check( + size_t size) const { #ifdef BE_TEST if (!doris::ExecEnv::ready()) { return; @@ -132,8 +136,9 @@ void Allocator::sys_memory_check(size_t } } -template -void Allocator::memory_tracker_check(size_t size) const { +template +void Allocator::memory_tracker_check( + size_t size) const { #ifdef BE_TEST if (!doris::ExecEnv::ready()) { return; @@ -168,24 +173,27 @@ void Allocator::memory_tracker_check(siz } } -template -void Allocator::memory_check(size_t size) const { +template +void Allocator::memory_check( + size_t size) const { sys_memory_check(size); memory_tracker_check(size); } -template -void Allocator::consume_memory(size_t size) const { +template +void Allocator::consume_memory( + size_t size) const { CONSUME_THREAD_MEM_TRACKER(size); } -template -void Allocator::release_memory(size_t size) const { +template +void Allocator::release_memory( + size_t size) const { RELEASE_THREAD_MEM_TRACKER(size); } -template -void Allocator::throw_bad_alloc( +template +void Allocator::throw_bad_alloc( const std::string& err) const { LOG(WARNING) << err << fmt::format( @@ -199,23 +207,48 @@ void Allocator::throw_bad_alloc( throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err); } -template -void* Allocator::alloc(size_t size, size_t alignment) { +template +void* Allocator::alloc(size_t size, + size_t alignment) { return alloc_impl(size, alignment); } -template -void* Allocator::realloc(void* buf, size_t old_size, - size_t new_size, - size_t alignment) { +template +void* Allocator::realloc( + void* buf, size_t old_size, size_t new_size, size_t alignment) { return realloc_impl(buf, old_size, new_size, alignment); } -template class Allocator; -template class Allocator; -template class Allocator; -template class Allocator; -template class Allocator; -template class Allocator; -template class Allocator; -template class Allocator; +template class Allocator; +template class Allocator; +template class Allocator; +template class Allocator; +template class Allocator; +template class Allocator; +template class Allocator; +template class Allocator; +template class Allocator; +template class Allocator; +template class Allocator; +template class Allocator; +template class Allocator; +template class Allocator; +template class Allocator; +#if defined(USE_JEMALLOC) +template class Allocator; +template class Allocator; +template class Allocator; +template class Allocator; +template class Allocator; +template class Allocator; +template class Allocator; +template class Allocator; +#endif // #if defined(USE_JEMALLOC) +template class Allocator; +template class Allocator; +template class Allocator; +template class Allocator; +template class Allocator; +template class Allocator; +template class Allocator; +template class Allocator; diff --git a/be/src/vec/common/allocator.h b/be/src/vec/common/allocator.h index f805ab445a6ac6..c85eeb233351ff 100644 --- a/be/src/vec/common/allocator.h +++ b/be/src/vec/common/allocator.h @@ -23,6 +23,7 @@ // TODO: Readable #include +#include #include #include @@ -68,6 +69,159 @@ static constexpr size_t MALLOC_MIN_ALIGNMENT = 8; // is always a multiple of sixteen. (https://www.gnu.org/software/libc/manual/html_node/Aligned-Memory-Blocks.html) static constexpr int ALLOCATOR_ALIGNMENT_16 = 16; +class DefaultMemoryAllocator { +public: + static void* malloc(size_t size) __THROW { return std::malloc(size); } + + static void* calloc(size_t n, size_t size) __THROW { return std::calloc(n, size); } + + static constexpr bool need_record_actual_size() { return false; } + + /*static size_t allocated_size(void* ptr) { + LOG(FATAL) << "__builtin_unreachable"; + __builtin_unreachable(); + }*/ + + static int posix_memalign(void** ptr, size_t alignment, size_t size) __THROW { + return ::posix_memalign(ptr, alignment, size); + } + + static void* realloc(void* ptr, size_t size) __THROW { return std::realloc(ptr, size); } + + static void free(void* p) __THROW { std::free(p); } + + static void release_unused() {} + +private: +}; + +class ORCMemoryAllocator { +public: + static void* malloc(size_t size) __THROW { return reinterpret_cast(std::malloc(size)); } + + static void* calloc(size_t n, size_t size) __THROW { return std::calloc(n, size); } + + static constexpr bool need_record_actual_size() { return true; } + + static size_t allocated_size(void* ptr) { return malloc_usable_size(ptr); } + + static int posix_memalign(void** ptr, size_t alignment, size_t size) __THROW { + return ::posix_memalign(ptr, alignment, size); + } + + static void* realloc(void* ptr, size_t size) __THROW { + LOG(FATAL) << "__builtin_unreachable"; + __builtin_unreachable(); + } + + static void free(void* p) __THROW { std::free(p); } + + static void release_unused() {} +}; + +class RecordSizeMemoryAllocator { +public: + static void* malloc(size_t size) __THROW { + void* p = std::malloc(size); + if (p) { + std::lock_guard lock(_mutex); + _allocated_sizes[p] = size; + } + return p; + } + + static void* calloc(size_t n, size_t size) __THROW { + void* p = std::calloc(n, size); + if (p) { + std::lock_guard lock(_mutex); + _allocated_sizes[p] = n * size; + } + return p; + } + + static constexpr bool need_record_actual_size() { return false; } + + static size_t allocated_size(void* ptr) { + std::lock_guard lock(_mutex); + auto it = _allocated_sizes.find(ptr); + if (it != _allocated_sizes.end()) { + return it->second; + } + return 0; + } + + static int posix_memalign(void** ptr, size_t alignment, size_t size) __THROW { + int ret = ::posix_memalign(ptr, alignment, size); + if (ret == 0 && *ptr) { + std::lock_guard lock(_mutex); + _allocated_sizes[*ptr] = size; + } + return ret; + } + + static void* realloc(void* ptr, size_t size) __THROW { + std::lock_guard lock(_mutex); + void* p = std::realloc(ptr, size); + if (p) { + _allocated_sizes.erase(ptr); + _allocated_sizes[p] = size; + } + return p; + } + + static void free(void* p) __THROW { + if (p) { + std::lock_guard lock(_mutex); + _allocated_sizes.erase(p); + std::free(p); + } + } + + static void release_unused() {} + +private: + static std::unordered_map _allocated_sizes; + static std::mutex _mutex; +}; + +#if defined(USE_JEMALLOC) +#include +/*extern "C" { +void* je_arrow_malloc(size_t size) __THROW; +void* je_arrow_calloc(size_t n, size_t size) __THROW; +int je_arrow_posix_memalign(void** ptr, size_t alignment, size_t size) __THROW; +void* je_arrow_realloc(void* p, size_t size) __THROW; +void je_arrow_free(void* p) __THROW; +size_t je_arrow_malloc_usable_size(void* ptr) __THROW; +int je_arrow_mallctl(const char* name, void* oldp, size_t* oldlenp, void* newp, size_t newlen); +}*/ +class ArrowJemallocMemoryAllocator { +public: + static void* malloc(size_t size) __THROW { return je_malloc(size); } + + static void* calloc(size_t n, size_t size) __THROW { return je_calloc(n, size); } + + static constexpr bool need_record_actual_size() { return false; } + + /*static size_t allocated_size(void* ptr) { + LOG(FATAL) << "__builtin_unreachable"; + __builtin_unreachable(); + }*/ + + static int posix_memalign(void** ptr, size_t alignment, size_t size) __THROW { + return je_posix_memalign(ptr, alignment, size); + } + + static void* realloc(void* ptr, size_t size) __THROW { return je_realloc(ptr, size); } + + static void free(void* p) __THROW { je_free(p); } + + static void release_unused() { + je_mallctl(fmt::format("arena.{}.purge", MALLCTL_ARENAS_ALL).c_str(), NULL, NULL, NULL, 0); + } +}; +#endif // defined(USE_JEMALLOC) + /** Responsible for allocating / freeing memory. Used, for example, in PODArray, Arena. * Also used in hash tables. * The interface is different from std::allocator @@ -78,7 +232,7 @@ static constexpr int ALLOCATOR_ALIGNMENT_16 = 16; * - random hint address for mmap * - mmap_threshold for using mmap less or more */ -template +template class Allocator { public: void sys_memory_check(size_t size) const; @@ -99,6 +253,7 @@ class Allocator { memory_check(size); consume_memory(size); void* buf; + size_t record_size = size; if (use_mmap && size >= doris::config::mmap_threshold) { if (alignment > MMAP_MIN_ALIGNMENT) @@ -112,47 +267,67 @@ class Allocator { release_memory(size); throw_bad_alloc(fmt::format("Allocator: Cannot mmap {}.", size)); } + if constexpr (MemoryAllocator::need_record_actual_size()) { + record_size = MemoryAllocator::allocated_size(buf); + } /// No need for zero-fill, because mmap guarantees it. } else { if (alignment <= MALLOC_MIN_ALIGNMENT) { if constexpr (clear_memory) - buf = ::calloc(size, 1); + buf = MemoryAllocator::calloc(size, 1); else - buf = ::malloc(size); + buf = MemoryAllocator::malloc(size); if (nullptr == buf) { release_memory(size); - throw_bad_alloc(fmt::format("Allocator: Cannot malloc {}.", size)); + throw_bad_alloc( + fmt::format("Allocator: Cannot malloc {}.", size)); // overcommit = 1 + } + if constexpr (MemoryAllocator::need_record_actual_size()) { + record_size = MemoryAllocator::allocated_size(buf); } } else { buf = nullptr; - int res = posix_memalign(&buf, alignment, size); + int res = MemoryAllocator::posix_memalign(&buf, alignment, size); if (0 != res) { release_memory(size); throw_bad_alloc( fmt::format("Cannot allocate memory (posix_memalign) {}.", size)); } - - if constexpr (clear_memory) memset(buf, 0, size); + if constexpr (MemoryAllocator::need_record_actual_size()) { + record_size = MemoryAllocator::allocated_size(buf); + } } } + if constexpr (MemoryAllocator::need_record_actual_size()) { + consume_memory(record_size - size); + } return buf; } /// Free memory range. void free(void* buf, size_t size) { + /*size_t record_size = 0; + if (size) { + record_size = size; + } else { + record_size = MemoryAllocator::allocated_size(buf); + }*/ if (use_mmap && size >= doris::config::mmap_threshold) { if (0 != munmap(buf, size)) { throw_bad_alloc(fmt::format("Allocator: Cannot munmap {}.", size)); } } else { - ::free(buf); + MemoryAllocator::free(buf); } + //release_memory(record_size); release_memory(size); } + void release_unused() { MemoryAllocator::release_unused(); } + /** Enlarge memory range. * Data from old range is moved to the beginning of new range. * Address of memory range could change. @@ -170,7 +345,7 @@ class Allocator { (old_size < doris::config::mmap_threshold && new_size < doris::config::mmap_threshold && alignment <= MALLOC_MIN_ALIGNMENT)) { /// Resize malloc'd memory region with no special alignment requirement. - void* new_buf = ::realloc(buf, new_size); + void* new_buf = MemoryAllocator::realloc(buf, new_size); if (nullptr == new_buf) { release_memory(new_size - old_size); throw_bad_alloc(fmt::format("Allocator: Cannot realloc from {} to {}.", old_size, diff --git a/be/src/vec/common/allocator_fwd.h b/be/src/vec/common/allocator_fwd.h index 988f7a5c7af041..da43030a13389d 100644 --- a/be/src/vec/common/allocator_fwd.h +++ b/be/src/vec/common/allocator_fwd.h @@ -24,7 +24,9 @@ #pragma once #include -template +class DefaultMemoryAllocator; +template class Allocator; template diff --git a/be/src/vec/common/hash_table/phmap_fwd_decl.h b/be/src/vec/common/hash_table/phmap_fwd_decl.h index 62373410968549..c6ff77a7e71d01 100644 --- a/be/src/vec/common/hash_table/phmap_fwd_decl.h +++ b/be/src/vec/common/hash_table/phmap_fwd_decl.h @@ -26,7 +26,7 @@ namespace doris::vectorized { /// `Allocator_` implements several interfaces of `std::allocator` /// which `phmap::flat_hash_map` will use. template -class Allocator_ : private Allocator { +class Allocator_ : private Allocator { public: using value_type = T; using pointer = T*; diff --git a/be/src/vec/exec/format/orc/orc_memory_pool.h b/be/src/vec/exec/format/orc/orc_memory_pool.h index 0d10edef9c45d9..1df3d63f95291e 100644 --- a/be/src/vec/exec/format/orc/orc_memory_pool.h +++ b/be/src/vec/exec/format/orc/orc_memory_pool.h @@ -18,36 +18,36 @@ #pragma once #include "orc/MemoryPool.hh" +#include "vec/common/allocator.h" -#if defined(USE_JEMALLOC) && defined(USE_MEM_TRACKER) -extern "C" { -void* doris_malloc(size_t size) __THROW; -void doris_free(void* p) __THROW; -} -#endif // #if defined(USE_JEMALLOC) && defined(USE_MEM_TRACKER) +#if defined(ADDRESS_SANITIZER) || defined(LEAK_SANITIZER) || defined(THREAD_SANITIZER) +using ORC_MEMORY_ALLOCATOR = RecordSizeMemoryAllocator; +#else +using ORC_MEMORY_ALLOCATOR = ORCMemoryAllocator; +#endif namespace doris::vectorized { class ORCMemoryPool : public orc::MemoryPool { public: char* malloc(uint64_t size) override { -#if defined(USE_JEMALLOC) && defined(USE_MEM_TRACKER) - return reinterpret_cast(doris_malloc(size)); -#else - return reinterpret_cast(std::malloc(size)); -#endif // #if defined(USE_JEMALLOC) && defined(USE_MEM_TRACKER) + char* p = reinterpret_cast(_allocator.alloc(size)); + return p; } void free(char* p) override { -#if defined(USE_JEMALLOC) && defined(USE_MEM_TRACKER) - doris_free(p); -#else - std::free(p); -#endif // #if defined(USE_JEMALLOC) && defined(USE_MEM_TRACKER) + if (p == nullptr) { + return; + } + size_t size = ORC_MEMORY_ALLOCATOR::allocated_size(p); + _allocator.free(p, size); } ORCMemoryPool() = default; ~ORCMemoryPool() override = default; + +private: + Allocator _allocator; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index ff76947625430d..186a720ad7aab8 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -178,7 +178,6 @@ OrcReader::OrcReader(const TFileScanRangeParams& params, const TFileRangeDesc& r } OrcReader::~OrcReader() { - SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(ExecEnv::GetInstance()->orc_reader_tracker()); if (_obj_pool && _obj_pool.get()) { _obj_pool->clear(); } @@ -279,7 +278,6 @@ Status OrcReader::init_reader( const RowDescriptor* row_descriptor, const VExprContextSPtrs* not_single_slot_filter_conjuncts, const std::unordered_map* slot_id_to_filter_conjuncts) { - SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(ExecEnv::GetInstance()->orc_reader_tracker()); _column_names = column_names; _colname_to_value_range = colname_to_value_range; _lazy_read_ctx.conjuncts = conjuncts; @@ -1543,7 +1541,6 @@ std::string OrcReader::get_field_name_lower_case(const orc::Type* orc_type, int } Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { - SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(ExecEnv::GetInstance()->orc_reader_tracker()); RETURN_IF_ERROR(get_next_block_impl(block, read_rows, eof)); if (_orc_filter) { RETURN_IF_ERROR(_orc_filter->get_status()); diff --git a/be/src/vec/exec/format/parquet/arrow_memory_pool.cpp b/be/src/vec/exec/format/parquet/arrow_memory_pool.cpp new file mode 100644 index 00000000000000..a26e54513f930b --- /dev/null +++ b/be/src/vec/exec/format/parquet/arrow_memory_pool.cpp @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/format/parquet/arrow_memory_pool.h" + +#include "glog/logging.h" + +namespace doris::vectorized { + +// A static piece of memory for 0-size allocations, so as to return +// an aligned non-null pointer. Note the correct value for DebugAllocator +// checks is hardcoded. +alignas(kDefaultBufferAlignment) int64_t zero_size_area[1] = {kDebugXorSuffix}; + +arrow::Status ArrowAllocator::allocate_aligned(int64_t size, int64_t alignment, uint8_t** out) { + if (size == 0) { + *out = kZeroSizeArea; + return arrow::Status::OK(); + } + *out = reinterpret_cast(_allocator.alloc(size, alignment)); + if (*out == NULL) { + return arrow::Status::OutOfMemory("malloc of size ", size, " failed"); + } + return arrow::Status::OK(); +} + +arrow::Status ArrowAllocator::reallocate_aligned(int64_t old_size, int64_t new_size, + int64_t alignment, uint8_t** ptr) { + uint8_t* previous_ptr = *ptr; + if (previous_ptr == kZeroSizeArea) { + DCHECK_EQ(old_size, 0); + return allocate_aligned(new_size, alignment, ptr); + } + if (new_size == 0) { + deallocate_aligned(previous_ptr, old_size, alignment); + *ptr = kZeroSizeArea; + return arrow::Status::OK(); + } + *ptr = reinterpret_cast(_allocator.realloc(*ptr, static_cast(old_size), + static_cast(new_size), alignment)); + if (*ptr == NULL) { + *ptr = previous_ptr; + return arrow::Status::OutOfMemory("realloc of size ", new_size, " failed"); + } + return arrow::Status::OK(); +} + +void ArrowAllocator::deallocate_aligned(uint8_t* ptr, int64_t size, int64_t alignment) { + if (ptr == kZeroSizeArea) { + DCHECK_EQ(size, 0); + } else { + _allocator.free(ptr, static_cast(size)); + } +} + +void ArrowAllocator::release_unused() { + _allocator.release_unused(); +} + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/arrow_memory_pool.h b/be/src/vec/exec/format/parquet/arrow_memory_pool.h new file mode 100644 index 00000000000000..8d26c041dbf559 --- /dev/null +++ b/be/src/vec/exec/format/parquet/arrow_memory_pool.h @@ -0,0 +1,204 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "arrow/memory_pool.h" +#include "arrow/status.h" +#include "vec/common/allocator.h" +#include "vec/common/allocator_fwd.h" + +namespace doris::vectorized { + +constexpr int64_t kDefaultBufferAlignment = 64; +static constexpr int64_t kDebugXorSuffix = -0x181fe80e0b464188LL; +#ifndef NDEBUG +static constexpr uint8_t kAllocPoison = 0xBC; +static constexpr uint8_t kReallocPoison = 0xBD; +static constexpr uint8_t kDeallocPoison = 0xBE; +#endif + +// A static piece of memory for 0-size allocations, so as to return +// an aligned non-null pointer. Note the correct value for DebugAllocator +// checks is hardcoded. +extern int64_t zero_size_area[1]; +static uint8_t* const kZeroSizeArea = reinterpret_cast(&zero_size_area); + +#if defined(USE_JEMALLOC) +#include +using ARROW_MEMORY_ALLOCATOR = ArrowJemallocMemoryAllocator; +#else +using ARROW_MEMORY_ALLOCATOR = DefaultMemoryAllocator; +#endif // #if defined(USE_JEMALLOC) + +class ArrowAllocator { +public: + arrow::Status allocate_aligned(int64_t size, int64_t alignment, uint8_t** out); + arrow::Status reallocate_aligned(int64_t old_size, int64_t new_size, int64_t alignment, + uint8_t** ptr); + void deallocate_aligned(uint8_t* ptr, int64_t size, int64_t alignment); + void release_unused(); + +private: + Allocator _allocator; +}; + +/////////////////////////////////////////////////////////////////////// +// Helper tracking memory statistics + +/// \brief Memory pool statistics +/// +/// 64-byte aligned so that all atomic values are on the same cache line. +class alignas(64) ArrowMemoryPoolStats { +private: + // All atomics are updated according to Acquire-Release ordering. + // https://en.cppreference.com/w/cpp/atomic/memory_order#Release-Acquire_ordering + // + // max_memory_, total_allocated_bytes_, and num_allocs_ only go up (they are + // monotonically increasing) which can allow some optimizations. + std::atomic max_memory_ {0}; + std::atomic bytes_allocated_ {0}; + std::atomic total_allocated_bytes_ {0}; + std::atomic num_allocs_ {0}; + +public: + int64_t max_memory() const { return max_memory_.load(std::memory_order_acquire); } + + int64_t bytes_allocated() const { return bytes_allocated_.load(std::memory_order_acquire); } + + int64_t total_bytes_allocated() const { + return total_allocated_bytes_.load(std::memory_order_acquire); + } + + int64_t num_allocations() const { return num_allocs_.load(std::memory_order_acquire); } + + inline void did_allocate_bytes(int64_t size) { + // Issue the load before everything else. max_memory_ is monotonically increasing, + // so we can use a relaxed load before the read-modify-write. + auto max_memory = max_memory_.load(std::memory_order_relaxed); + const auto old_bytes_allocated = + bytes_allocated_.fetch_add(size, std::memory_order_acq_rel); + // Issue store operations on values that we don't depend on to proceed + // with execution. When done, max_memory and old_bytes_allocated have + // a higher chance of being available on CPU registers. This also has the + // nice side-effect of putting 3 atomic stores close to each other in the + // instruction stream. + total_allocated_bytes_.fetch_add(size, std::memory_order_acq_rel); + num_allocs_.fetch_add(1, std::memory_order_acq_rel); + + // If other threads are updating max_memory_ concurrently we leave the loop without + // updating knowing that it already reached a value even higher than ours. + const auto allocated = old_bytes_allocated + size; + while (max_memory < allocated && + !max_memory_.compare_exchange_weak( + /*expected=*/max_memory, /*desired=*/allocated, std::memory_order_acq_rel)) { + } + } + + inline void did_reallocate_bytes(int64_t old_size, int64_t new_size) { + if (new_size > old_size) { + did_allocate_bytes(new_size - old_size); + } else { + did_free_bytes(old_size - new_size); + } + } + + inline void did_free_bytes(int64_t size) { + bytes_allocated_.fetch_sub(size, std::memory_order_acq_rel); + } +}; + +template +class ArrowMemoryPool : public arrow::MemoryPool { +public: + ~ArrowMemoryPool() override = default; + + arrow::Status Allocate(int64_t size, int64_t alignment, uint8_t** out) override { + if (size < 0) { + return arrow::Status::Invalid("negative malloc size"); + } + if (static_cast(size) >= std::numeric_limits::max()) { + return arrow::Status::OutOfMemory("malloc size overflows size_t"); + } + RETURN_NOT_OK(_allocator.allocate_aligned(size, alignment, out)); +#ifndef NDEBUG + // Poison data + if (size > 0) { + DCHECK_NE(*out, nullptr); + (*out)[0] = kAllocPoison; + (*out)[size - 1] = kAllocPoison; + } +#endif + + _stats.did_allocate_bytes(size); + return arrow::Status::OK(); + } + + arrow::Status Reallocate(int64_t old_size, int64_t new_size, int64_t alignment, + uint8_t** ptr) override { + if (new_size < 0) { + return arrow::Status::Invalid("negative realloc size"); + } + if (static_cast(new_size) >= std::numeric_limits::max()) { + return arrow::Status::OutOfMemory("realloc overflows size_t"); + } + RETURN_NOT_OK(_allocator.reallocate_aligned(old_size, new_size, alignment, ptr)); +#ifndef NDEBUG + // Poison data + if (new_size > old_size) { + DCHECK_NE(*ptr, nullptr); + (*ptr)[old_size] = kReallocPoison; + (*ptr)[new_size - 1] = kReallocPoison; + } +#endif + + _stats.did_reallocate_bytes(old_size, new_size); + return arrow::Status::OK(); + } + + void Free(uint8_t* buffer, int64_t size, int64_t alignment) override { +#ifndef NDEBUG + // Poison data + if (size > 0) { + DCHECK_NE(buffer, nullptr); + buffer[0] = kDeallocPoison; + buffer[size - 1] = kDeallocPoison; + } +#endif + _allocator.deallocate_aligned(buffer, size, alignment); + + _stats.did_free_bytes(size); + } + + void ReleaseUnused() override { _allocator.release_unused(); } + + int64_t bytes_allocated() const override { return _stats.bytes_allocated(); } + + int64_t max_memory() const override { return _stats.max_memory(); } + + int64_t total_bytes_allocated() const override { return _stats.total_bytes_allocated(); } + + int64_t num_allocations() const override { return _stats.num_allocations(); } + + std::string backend_name() const override { return "ArrowMemoryPool"; } + +protected: + ArrowMemoryPoolStats _stats; + Allocator _allocator; +}; + +} // namespace doris::vectorized diff --git a/be/src/vec/runtime/vorc_transformer.cpp b/be/src/vec/runtime/vorc_transformer.cpp index b3761ca6fc4be1..6c512a94373d20 100644 --- a/be/src/vec/runtime/vorc_transformer.cpp +++ b/be/src/vec/runtime/vorc_transformer.cpp @@ -33,7 +33,6 @@ #include "runtime/define_primitive_type.h" #include "runtime/exec_env.h" #include "runtime/runtime_state.h" -#include "runtime/thread_context.h" #include "runtime/types.h" #include "util/binary_cast.hpp" #include "util/debug_util.h" @@ -124,12 +123,7 @@ VOrcTransformer::VOrcTransformer(RuntimeState* state, doris::io::FileWriter* fil set_compression_type(compress_type); } -VOrcTransformer::~VOrcTransformer() { - SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(ExecEnv::GetInstance()->orc_writer_tracker()); -} - Status VOrcTransformer::open() { - SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(ExecEnv::GetInstance()->orc_writer_tracker()); if (!_schema_str.empty()) { try { _schema = orc::Type::buildTypeFromString(_schema_str); @@ -322,16 +316,15 @@ int64_t VOrcTransformer::written_len() { } Status VOrcTransformer::close() { - SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(ExecEnv::GetInstance()->orc_writer_tracker()); - if (_writer != nullptr) { - try { + try { + if (_writer != nullptr) { _writer->close(); - } catch (const std::exception& e) { - return Status::IOError(e.what()); } - } - if (_output_stream) { - _output_stream->close(); + if (_output_stream) { + _output_stream->close(); + } + } catch (const std::exception& e) { + return Status::IOError(e.what()); } return Status::OK(); } @@ -341,8 +334,6 @@ Status VOrcTransformer::write(const Block& block) { return Status::OK(); } - SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(ExecEnv::GetInstance()->orc_writer_tracker()); - // Buffer used by date/datetime/datev2/datetimev2/largeint type std::vector buffer_list; Defer defer {[&]() { diff --git a/be/src/vec/runtime/vorc_transformer.h b/be/src/vec/runtime/vorc_transformer.h index dff98ac8903b5f..134e949e76cabd 100644 --- a/be/src/vec/runtime/vorc_transformer.h +++ b/be/src/vec/runtime/vorc_transformer.h @@ -84,7 +84,7 @@ class VOrcTransformer final : public VFileFormatTransformer { TFileCompressType::type compression, const iceberg::Schema* iceberg_schema = nullptr); - ~VOrcTransformer(); + ~VOrcTransformer() = default; Status open() override; diff --git a/be/src/vec/runtime/vparquet_transformer.cpp b/be/src/vec/runtime/vparquet_transformer.cpp index 116a898c4f1c39..1969858349f0e9 100644 --- a/be/src/vec/runtime/vparquet_transformer.cpp +++ b/be/src/vec/runtime/vparquet_transformer.cpp @@ -42,6 +42,7 @@ #include "olap/olap_common.h" #include "runtime/decimalv2_value.h" #include "runtime/define_primitive_type.h" +#include "runtime/exec_env.h" #include "runtime/runtime_state.h" #include "runtime/types.h" #include "util/arrow/block_convertor.h" @@ -237,6 +238,7 @@ VParquetTransformer::VParquetTransformer(RuntimeState* state, doris::io::FileWri Status VParquetTransformer::_parse_properties() { try { + arrow::MemoryPool* pool = ExecEnv::GetInstance()->arrow_memory_pool(); parquet::WriterProperties::Builder builder; ParquetBuildHelper::build_compression_type(builder, _compression_type); ParquetBuildHelper::build_version(builder, _parquet_version); @@ -248,6 +250,7 @@ Status VParquetTransformer::_parse_properties() { builder.created_by( fmt::format("{}({})", doris::get_short_version(), parquet::DEFAULT_CREATED_BY)); builder.max_row_group_length(std::numeric_limits::max()); + builder.memory_pool(pool); _parquet_writer_properties = builder.build(); _arrow_properties = parquet::ArrowWriterProperties::Builder() .enable_deprecated_int96_timestamps() @@ -292,8 +295,9 @@ Status VParquetTransformer::write(const Block& block) { // serialize std::shared_ptr result; - RETURN_IF_ERROR(convert_to_arrow_batch(block, _arrow_schema, arrow::default_memory_pool(), - &result, _state->timezone_obj())); + RETURN_IF_ERROR(convert_to_arrow_batch(block, _arrow_schema, + ExecEnv::GetInstance()->arrow_memory_pool(), &result, + _state->timezone_obj())); RETURN_DORIS_STATUS_IF_ERROR(_writer->WriteRecordBatch(*result)); _write_size += block.bytes(); @@ -305,9 +309,10 @@ Status VParquetTransformer::write(const Block& block) { } arrow::Status VParquetTransformer::_open_file_writer() { - ARROW_ASSIGN_OR_RAISE(_writer, parquet::arrow::FileWriter::Open( - *_arrow_schema, arrow::default_memory_pool(), _outstream, - _parquet_writer_properties, _arrow_properties)); + ARROW_ASSIGN_OR_RAISE(_writer, + parquet::arrow::FileWriter::Open( + *_arrow_schema, ExecEnv::GetInstance()->arrow_memory_pool(), + _outstream, _parquet_writer_properties, _arrow_properties)); return arrow::Status::OK(); } From 2f0221749c020d5fe2a3072ca68bbf7c1b6406f2 Mon Sep 17 00:00:00 2001 From: chenqi Date: Wed, 17 Jul 2024 15:00:52 +0800 Subject: [PATCH 3/5] clear_memory in orc memory pool temporialy. --- be/src/vec/exec/format/orc/orc_memory_pool.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/vec/exec/format/orc/orc_memory_pool.h b/be/src/vec/exec/format/orc/orc_memory_pool.h index 1df3d63f95291e..00132e58edc793 100644 --- a/be/src/vec/exec/format/orc/orc_memory_pool.h +++ b/be/src/vec/exec/format/orc/orc_memory_pool.h @@ -47,7 +47,7 @@ class ORCMemoryPool : public orc::MemoryPool { ~ORCMemoryPool() override = default; private: - Allocator _allocator; + Allocator _allocator; }; } // namespace doris::vectorized From 608f36c6fc49b4c63eccc19ac4ba38acccfb2bc3 Mon Sep 17 00:00:00 2001 From: chenqi Date: Fri, 19 Jul 2024 15:02:05 +0800 Subject: [PATCH 4/5] update2 --- be/src/vec/common/allocator.cpp | 4 +++ be/src/vec/common/allocator.h | 25 +++---------------- be/src/vec/exec/format/orc/orc_memory_pool.h | 2 +- .../exec/format/parquet/arrow_memory_pool.cpp | 4 +-- 4 files changed, 11 insertions(+), 24 deletions(-) diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp index c7d9a30d1d47d0..42caeb80c738cd 100644 --- a/be/src/vec/common/allocator.cpp +++ b/be/src/vec/common/allocator.cpp @@ -227,6 +227,10 @@ template class Allocator; template class Allocator; template class Allocator; template class Allocator; + +/** It would be better to put these Memory Allocators where they are used, such as in the orc memory pool and arrow memory pool. + * But currently allocators use templates in .cpp instead of all in .h, so they can only be placed here. + */ template class Allocator; template class Allocator; template class Allocator; diff --git a/be/src/vec/common/allocator.h b/be/src/vec/common/allocator.h index c85eeb233351ff..4145b7bb2d1e78 100644 --- a/be/src/vec/common/allocator.h +++ b/be/src/vec/common/allocator.h @@ -77,11 +77,6 @@ class DefaultMemoryAllocator { static constexpr bool need_record_actual_size() { return false; } - /*static size_t allocated_size(void* ptr) { - LOG(FATAL) << "__builtin_unreachable"; - __builtin_unreachable(); - }*/ - static int posix_memalign(void** ptr, size_t alignment, size_t size) __THROW { return ::posix_memalign(ptr, alignment, size); } @@ -95,6 +90,9 @@ class DefaultMemoryAllocator { private: }; +/** It would be better to put these Memory Allocators where they are used, such as in the orc memory pool and arrow memory pool. + * But currently allocators use templates in .cpp instead of all in .h, so they can only be placed here. + */ class ORCMemoryAllocator { public: static void* malloc(size_t size) __THROW { return reinterpret_cast(std::malloc(size)); } @@ -186,15 +184,6 @@ class RecordSizeMemoryAllocator { #if defined(USE_JEMALLOC) #include -/*extern "C" { -void* je_arrow_malloc(size_t size) __THROW; -void* je_arrow_calloc(size_t n, size_t size) __THROW; -int je_arrow_posix_memalign(void** ptr, size_t alignment, size_t size) __THROW; -void* je_arrow_realloc(void* p, size_t size) __THROW; -void je_arrow_free(void* p) __THROW; -size_t je_arrow_malloc_usable_size(void* ptr) __THROW; -int je_arrow_mallctl(const char* name, void* oldp, size_t* oldlenp, void* newp, size_t newlen); -}*/ class ArrowJemallocMemoryAllocator { public: static void* malloc(size_t size) __THROW { return je_malloc(size); } @@ -203,11 +192,6 @@ class ArrowJemallocMemoryAllocator { static constexpr bool need_record_actual_size() { return false; } - /*static size_t allocated_size(void* ptr) { - LOG(FATAL) << "__builtin_unreachable"; - __builtin_unreachable(); - }*/ - static int posix_memalign(void** ptr, size_t alignment, size_t size) __THROW { return je_posix_memalign(ptr, alignment, size); } @@ -281,8 +265,7 @@ class Allocator { if (nullptr == buf) { release_memory(size); - throw_bad_alloc( - fmt::format("Allocator: Cannot malloc {}.", size)); // overcommit = 1 + throw_bad_alloc(fmt::format("Allocator: Cannot malloc {}.", size)); } if constexpr (MemoryAllocator::need_record_actual_size()) { record_size = MemoryAllocator::allocated_size(buf); diff --git a/be/src/vec/exec/format/orc/orc_memory_pool.h b/be/src/vec/exec/format/orc/orc_memory_pool.h index 00132e58edc793..1df3d63f95291e 100644 --- a/be/src/vec/exec/format/orc/orc_memory_pool.h +++ b/be/src/vec/exec/format/orc/orc_memory_pool.h @@ -47,7 +47,7 @@ class ORCMemoryPool : public orc::MemoryPool { ~ORCMemoryPool() override = default; private: - Allocator _allocator; + Allocator _allocator; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/arrow_memory_pool.cpp b/be/src/vec/exec/format/parquet/arrow_memory_pool.cpp index a26e54513f930b..ed06e5c821aba7 100644 --- a/be/src/vec/exec/format/parquet/arrow_memory_pool.cpp +++ b/be/src/vec/exec/format/parquet/arrow_memory_pool.cpp @@ -32,7 +32,7 @@ arrow::Status ArrowAllocator::allocate_aligned(int64_t size, int64_t alignment, return arrow::Status::OK(); } *out = reinterpret_cast(_allocator.alloc(size, alignment)); - if (*out == NULL) { + if (*out == nullptr) { return arrow::Status::OutOfMemory("malloc of size ", size, " failed"); } return arrow::Status::OK(); @@ -52,7 +52,7 @@ arrow::Status ArrowAllocator::reallocate_aligned(int64_t old_size, int64_t new_s } *ptr = reinterpret_cast(_allocator.realloc(*ptr, static_cast(old_size), static_cast(new_size), alignment)); - if (*ptr == NULL) { + if (*ptr == nullptr) { *ptr = previous_ptr; return arrow::Status::OutOfMemory("realloc of size ", new_size, " failed"); } From d746ddc44603b8c12ecf6a09c79c8f1681a95dbf Mon Sep 17 00:00:00 2001 From: kaka11chen Date: Wed, 24 Jul 2024 01:21:23 +0800 Subject: [PATCH 5/5] Fix review comments. --- be/src/vec/common/allocator.cpp | 11 +--- be/src/vec/common/allocator.h | 54 +++++++------------ .../exec/format/parquet/arrow_memory_pool.h | 5 -- 3 files changed, 20 insertions(+), 50 deletions(-) diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp index 42caeb80c738cd..8eba45b03b574e 100644 --- a/be/src/vec/common/allocator.cpp +++ b/be/src/vec/common/allocator.cpp @@ -238,16 +238,7 @@ template class Allocator; template class Allocator; template class Allocator; template class Allocator; -#if defined(USE_JEMALLOC) -template class Allocator; -template class Allocator; -template class Allocator; -template class Allocator; -template class Allocator; -template class Allocator; -template class Allocator; -template class Allocator; -#endif // #if defined(USE_JEMALLOC) + template class Allocator; template class Allocator; template class Allocator; diff --git a/be/src/vec/common/allocator.h b/be/src/vec/common/allocator.h index 4145b7bb2d1e78..9e0a4ac966fae7 100644 --- a/be/src/vec/common/allocator.h +++ b/be/src/vec/common/allocator.h @@ -23,6 +23,9 @@ // TODO: Readable #include +#if defined(USE_JEMALLOC) +#include +#endif // defined(USE_JEMALLOC) #include #include #include @@ -85,9 +88,11 @@ class DefaultMemoryAllocator { static void free(void* p) __THROW { std::free(p); } - static void release_unused() {} - -private: + static void release_unused() { +#if defined(USE_JEMALLOC) + jemallctl(fmt::format("arena.{}.purge", MALLCTL_ARENAS_ALL).c_str(), NULL, NULL, NULL, 0); +#endif // defined(USE_JEMALLOC) + } }; /** It would be better to put these Memory Allocators where they are used, such as in the orc memory pool and arrow memory pool. @@ -159,11 +164,18 @@ class RecordSizeMemoryAllocator { static void* realloc(void* ptr, size_t size) __THROW { std::lock_guard lock(_mutex); + + auto it = _allocated_sizes.find(ptr); + if (it != _allocated_sizes.end()) { + _allocated_sizes.erase(it); + } + void* p = std::realloc(ptr, size); + if (p) { - _allocated_sizes.erase(ptr); _allocated_sizes[p] = size; } + return p; } @@ -182,30 +194,6 @@ class RecordSizeMemoryAllocator { static std::mutex _mutex; }; -#if defined(USE_JEMALLOC) -#include -class ArrowJemallocMemoryAllocator { -public: - static void* malloc(size_t size) __THROW { return je_malloc(size); } - - static void* calloc(size_t n, size_t size) __THROW { return je_calloc(n, size); } - - static constexpr bool need_record_actual_size() { return false; } - - static int posix_memalign(void** ptr, size_t alignment, size_t size) __THROW { - return je_posix_memalign(ptr, alignment, size); - } - - static void* realloc(void* ptr, size_t size) __THROW { return je_realloc(ptr, size); } - - static void free(void* p) __THROW { je_free(p); } - - static void release_unused() { - je_mallctl(fmt::format("arena.{}.purge", MALLCTL_ARENAS_ALL).c_str(), NULL, NULL, NULL, 0); - } -}; -#endif // defined(USE_JEMALLOC) - /** Responsible for allocating / freeing memory. Used, for example, in PODArray, Arena. * Also used in hash tables. * The interface is different from std::allocator @@ -279,6 +267,9 @@ class Allocator { throw_bad_alloc( fmt::format("Cannot allocate memory (posix_memalign) {}.", size)); } + + if constexpr (clear_memory) memset(buf, 0, size); + if constexpr (MemoryAllocator::need_record_actual_size()) { record_size = MemoryAllocator::allocated_size(buf); } @@ -292,12 +283,6 @@ class Allocator { /// Free memory range. void free(void* buf, size_t size) { - /*size_t record_size = 0; - if (size) { - record_size = size; - } else { - record_size = MemoryAllocator::allocated_size(buf); - }*/ if (use_mmap && size >= doris::config::mmap_threshold) { if (0 != munmap(buf, size)) { throw_bad_alloc(fmt::format("Allocator: Cannot munmap {}.", size)); @@ -305,7 +290,6 @@ class Allocator { } else { MemoryAllocator::free(buf); } - //release_memory(record_size); release_memory(size); } diff --git a/be/src/vec/exec/format/parquet/arrow_memory_pool.h b/be/src/vec/exec/format/parquet/arrow_memory_pool.h index 8d26c041dbf559..a93e426f3746e9 100644 --- a/be/src/vec/exec/format/parquet/arrow_memory_pool.h +++ b/be/src/vec/exec/format/parquet/arrow_memory_pool.h @@ -38,12 +38,7 @@ static constexpr uint8_t kDeallocPoison = 0xBE; extern int64_t zero_size_area[1]; static uint8_t* const kZeroSizeArea = reinterpret_cast(&zero_size_area); -#if defined(USE_JEMALLOC) -#include -using ARROW_MEMORY_ALLOCATOR = ArrowJemallocMemoryAllocator; -#else using ARROW_MEMORY_ALLOCATOR = DefaultMemoryAllocator; -#endif // #if defined(USE_JEMALLOC) class ArrowAllocator { public: