From 86028cf706770424a5204b77316a7c3b2045a51c Mon Sep 17 00:00:00 2001 From: Tiewei Fang Date: Thu, 3 Apr 2025 16:50:49 +0800 Subject: [PATCH] [fix](Parquet) add a memory tracker to parquet meta (#49037) Problem Summary: Add a memory tracker to the Parquet metadata, allowing us to monitor the usage of Parquet metadata. --- be/src/runtime/exec_env.h | 4 ++++ be/src/runtime/exec_env_init.cpp | 2 ++ be/src/vec/exec/format/parquet/parquet_thrift_util.h | 2 +- .../exec/format/parquet/vparquet_file_metadata.cpp | 11 ++++++++++- .../vec/exec/format/parquet/vparquet_file_metadata.h | 6 ++++-- 5 files changed, 21 insertions(+), 4 deletions(-) diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 35b0c842d703cb..44cda3f81deab0 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -214,6 +214,7 @@ class ExecEnv { return _subcolumns_tree_tracker; } std::shared_ptr s3_file_buffer_tracker() { return _s3_file_buffer_tracker; } + std::shared_ptr parquet_meta_tracker() { return _parquet_meta_tracker; } ThreadPool* send_batch_thread_pool() { return _send_batch_thread_pool.get(); } ThreadPool* buffered_reader_prefetch_thread_pool() { @@ -409,6 +410,9 @@ class ExecEnv { std::shared_ptr _subcolumns_tree_tracker; std::shared_ptr _s3_file_buffer_tracker; + // Tracking memory consumption of parquet meta + std::shared_ptr _parquet_meta_tracker; + std::unique_ptr _send_batch_thread_pool; // Threadpool used to prefetch remote file for buffered reader std::unique_ptr _buffered_reader_prefetch_thread_pool; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index ab28ce3f5ad4ae..968d49d56b11af 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -598,6 +598,8 @@ void ExecEnv::init_mem_tracker() { MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "S3FileBuffer"); _stream_load_pipe_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::LOAD, "StreamLoadPipe"); + _parquet_meta_tracker = + MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::METADATA, "ParquetMeta"); } Status ExecEnv::_check_deploy_mode() { diff --git a/be/src/vec/exec/format/parquet/parquet_thrift_util.h b/be/src/vec/exec/format/parquet/parquet_thrift_util.h index b767f177f4a326..f6426a64495be9 100644 --- a/be/src/vec/exec/format/parquet/parquet_thrift_util.h +++ b/be/src/vec/exec/format/parquet/parquet_thrift_util.h @@ -75,7 +75,7 @@ static Status parse_thrift_footer(io::FileReaderSPtr file, FileMetaData** file_m tparquet::FileMetaData t_metadata; // deserialize footer RETURN_IF_ERROR(deserialize_thrift_msg(meta_ptr, &metadata_size, true, &t_metadata)); - *file_metadata = new FileMetaData(t_metadata); + *file_metadata = new FileMetaData(t_metadata, metadata_size); RETURN_IF_ERROR((*file_metadata)->init_schema()); *meta_size = PARQUET_FOOTER_SIZE + metadata_size; return Status::OK(); diff --git a/be/src/vec/exec/format/parquet/vparquet_file_metadata.cpp b/be/src/vec/exec/format/parquet/vparquet_file_metadata.cpp index 7a164c306c208b..98de497e320b1b 100644 --- a/be/src/vec/exec/format/parquet/vparquet_file_metadata.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_file_metadata.cpp @@ -22,11 +22,20 @@ #include #include +#include "runtime/exec_env.h" +#include "runtime/memory/mem_tracker_limiter.h" #include "schema_desc.h" namespace doris::vectorized { -FileMetaData::FileMetaData(tparquet::FileMetaData& metadata) : _metadata(metadata) {} +FileMetaData::FileMetaData(tparquet::FileMetaData& metadata, size_t mem_size) + : _metadata(metadata), _mem_size(mem_size) { + ExecEnv::GetInstance()->parquet_meta_tracker()->consume(mem_size); +} + +FileMetaData::~FileMetaData() { + ExecEnv::GetInstance()->parquet_meta_tracker()->release(_mem_size); +} Status FileMetaData::init_schema() { if (_metadata.schema[0].num_children <= 0) { diff --git a/be/src/vec/exec/format/parquet/vparquet_file_metadata.h b/be/src/vec/exec/format/parquet/vparquet_file_metadata.h index 5d745a0db62f41..d1ebb06957daa3 100644 --- a/be/src/vec/exec/format/parquet/vparquet_file_metadata.h +++ b/be/src/vec/exec/format/parquet/vparquet_file_metadata.h @@ -27,8 +27,8 @@ namespace doris::vectorized { class FileMetaData { public: - FileMetaData(tparquet::FileMetaData& metadata); - ~FileMetaData() = default; + FileMetaData(tparquet::FileMetaData& metadata, size_t mem_size); + ~FileMetaData(); Status init_schema(); const FieldDescriptor& schema() const { return _schema; } const tparquet::FileMetaData& to_thrift(); @@ -36,10 +36,12 @@ class FileMetaData { _schema.iceberg_sanitize(read_columns); } std::string debug_string() const; + size_t get_mem_size() const { return _mem_size; } private: tparquet::FileMetaData _metadata; FieldDescriptor _schema; + size_t _mem_size; }; } // namespace doris::vectorized