From fbde5076086c99f725894cf3b74c6a2f299b0a6a Mon Sep 17 00:00:00 2001 From: Siyang Tang <82279870+TangSiyang2001@users.noreply.github.com> Date: Fri, 27 Sep 2024 13:02:32 +0800 Subject: [PATCH] =?UTF-8?q?Revert=20"Revert=20"[enhancement](mem-tracker)?= =?UTF-8?q?=20Use=20thread=20local=20mem=20tracker=20to=20tr=E2=80=A6"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit fd2e20fd317d10a8bfafa6902f4e7ea74b3754f1. --- be/src/io/fs/s3_file_bufferpool.cpp | 25 +++++++++++++++++++------ be/src/io/fs/s3_file_bufferpool.h | 14 +++++++++----- be/src/olap/tablet.cpp | 1 + be/src/olap/tablet.h | 1 + be/src/runtime/snapshot_loader.cpp | 1 + be/src/runtime/snapshot_loader.h | 1 + 6 files changed, 32 insertions(+), 11 deletions(-) diff --git a/be/src/io/fs/s3_file_bufferpool.cpp b/be/src/io/fs/s3_file_bufferpool.cpp index f1f90ea7f2ef06..0d59ea0ed88263 100644 --- a/be/src/io/fs/s3_file_bufferpool.cpp +++ b/be/src/io/fs/s3_file_bufferpool.cpp @@ -31,6 +31,7 @@ #include "io/cache/file_cache_common.h" #include "io/fs/s3_common.h" #include "runtime/exec_env.h" +#include "runtime/memory/mem_tracker_limiter.h" #include "runtime/thread_context.h" #include "util/defer_op.h" #include "util/slice.h" @@ -77,17 +78,19 @@ Slice FileBuffer::get_slice() const { } FileBuffer::FileBuffer(BufferType type, std::function alloc_holder, - size_t offset, OperationState state) + size_t offset, OperationState state, + std::shared_ptr mem_tracker) : _type(type), _alloc_holder(std::move(alloc_holder)), _offset(offset), _size(0), _state(std::move(state)), _inner_data(std::make_unique()), - _capacity(_inner_data->size()) {} + _capacity(_inner_data->size()), + _mem_tracker(std::move(mem_tracker)) {} FileBuffer::~FileBuffer() { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->s3_file_buffer_tracker()); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker); _inner_data.reset(); } @@ -240,13 +243,22 @@ FileBufferBuilder& FileBufferBuilder::set_allocate_file_blocks_holder( } Status FileBufferBuilder::build(std::shared_ptr* buf) { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->s3_file_buffer_tracker()); + auto mem_tracker = ExecEnv::GetInstance()->s3_file_buffer_tracker(); + auto* thread_ctx = doris::thread_context(true); + if (thread_ctx != nullptr) { + // if thread local mem tracker is set, use it instead. + auto curr_tracker = thread_ctx->thread_mem_tracker_mgr->limiter_mem_tracker(); + if (curr_tracker != ExecEnv::GetInstance()->orphan_mem_tracker()) { + mem_tracker = std::move(curr_tracker); + } + } + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker); OperationState state(_sync_after_complete_task, _is_cancelled); if (_type == BufferType::UPLOAD) { RETURN_IF_CATCH_EXCEPTION(*buf = std::make_shared( std::move(_upload_cb), std::move(state), _offset, - std::move(_alloc_holder_cb))); + std::move(_alloc_holder_cb), std::move(mem_tracker))); return Status::OK(); } if (_type == BufferType::DOWNLOAD) { @@ -254,7 +266,8 @@ Status FileBufferBuilder::build(std::shared_ptr* buf) { std::move(_download), std::move(_write_to_local_file_cache), std::move(_write_to_use_buffer), std::move(state), - _offset, std::move(_alloc_holder_cb))); + _offset, std::move(_alloc_holder_cb), + std::move(mem_tracker))); return Status::OK(); } // should never come here diff --git a/be/src/io/fs/s3_file_bufferpool.h b/be/src/io/fs/s3_file_bufferpool.h index 1b552850ae3af8..a603c3cb29a4cb 100644 --- a/be/src/io/fs/s3_file_bufferpool.h +++ b/be/src/io/fs/s3_file_bufferpool.h @@ -27,6 +27,7 @@ #include "common/status.h" #include "io/cache/file_block.h" +#include "runtime/memory/mem_tracker_limiter.h" #include "util/crc32c.h" #include "util/slice.h" #include "util/threadpool.h" @@ -77,7 +78,7 @@ struct OperationState { struct FileBuffer { FileBuffer(BufferType type, std::function alloc_holder, size_t offset, - OperationState state); + OperationState state, std::shared_ptr mem_tracker); virtual ~FileBuffer(); /** * submit the correspoding task to async executor @@ -127,14 +128,16 @@ struct FileBuffer { struct PartData; std::unique_ptr _inner_data; size_t _capacity; + std::shared_ptr _mem_tracker; }; struct DownloadFileBuffer final : public FileBuffer { DownloadFileBuffer(std::function download, std::function write_to_cache, std::function write_to_use_buffer, OperationState state, - size_t offset, std::function alloc_holder) - : FileBuffer(BufferType::DOWNLOAD, alloc_holder, offset, state), + size_t offset, std::function alloc_holder, + std::shared_ptr mem_tracker) + : FileBuffer(BufferType::DOWNLOAD, alloc_holder, offset, state, std::move(mem_tracker)), _download(std::move(download)), _write_to_local_file_cache(std::move(write_to_cache)), _write_to_use_buffer(std::move(write_to_use_buffer)) {} @@ -153,8 +156,9 @@ struct DownloadFileBuffer final : public FileBuffer { struct UploadFileBuffer final : public FileBuffer { UploadFileBuffer(std::function upload_cb, OperationState state, - size_t offset, std::function alloc_holder) - : FileBuffer(BufferType::UPLOAD, alloc_holder, offset, state), + size_t offset, std::function alloc_holder, + std::shared_ptr mem_tracker) + : FileBuffer(BufferType::UPLOAD, alloc_holder, offset, state, std::move(mem_tracker)), _upload_to_remote(std::move(upload_cb)) {} ~UploadFileBuffer() override = default; Status append_data(const Slice& s) override; diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 51eabe5495ef89..8eb2625080c202 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -106,6 +106,7 @@ #include "olap/txn_manager.h" #include "olap/types.h" #include "olap/utils.h" +#include "runtime/memory/mem_tracker_limiter.h" #include "segment_loader.h" #include "service/point_query_executor.h" #include "tablet.h" diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 33253e82ced2b5..168801ff582bd9 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -46,6 +46,7 @@ #include "olap/rowset/rowset_reader.h" #include "olap/rowset/segment_v2/segment.h" #include "olap/version_graph.h" +#include "runtime/memory/mem_tracker_limiter.h" #include "segment_loader.h" #include "util/metrics.h" #include "util/once.h" diff --git a/be/src/runtime/snapshot_loader.cpp b/be/src/runtime/snapshot_loader.cpp index d04a5463879c9e..25630f9c829916 100644 --- a/be/src/runtime/snapshot_loader.cpp +++ b/be/src/runtime/snapshot_loader.cpp @@ -52,6 +52,7 @@ #include "olap/tablet_manager.h" #include "runtime/client_cache.h" #include "runtime/exec_env.h" +#include "runtime/memory/mem_tracker_limiter.h" #include "util/s3_uri.h" #include "util/s3_util.h" #include "util/thrift_rpc_helper.h" diff --git a/be/src/runtime/snapshot_loader.h b/be/src/runtime/snapshot_loader.h index 7b1d5a0d9428f8..ac9d4d5c4c13a0 100644 --- a/be/src/runtime/snapshot_loader.h +++ b/be/src/runtime/snapshot_loader.h @@ -26,6 +26,7 @@ #include "common/status.h" #include "olap/tablet_fwd.h" +#include "runtime/memory/mem_tracker_limiter.h" namespace doris { namespace io {