Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions be/src/io/fs/s3_file_bufferpool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "io/cache/file_cache_common.h"
#include "io/fs/s3_common.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/thread_context.h"
#include "util/defer_op.h"
#include "util/slice.h"
Expand Down Expand Up @@ -77,17 +78,19 @@ Slice FileBuffer::get_slice() const {
}

FileBuffer::FileBuffer(BufferType type, std::function<FileBlocksHolderPtr()> alloc_holder,
size_t offset, OperationState state)
size_t offset, OperationState state,
std::shared_ptr<MemTrackerLimiter> mem_tracker)
: _type(type),
_alloc_holder(std::move(alloc_holder)),
_offset(offset),
_size(0),
_state(std::move(state)),
_inner_data(std::make_unique<FileBuffer::PartData>()),
_capacity(_inner_data->size()) {}
_capacity(_inner_data->size()),
_mem_tracker(std::move(mem_tracker)) {}

FileBuffer::~FileBuffer() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->s3_file_buffer_tracker());
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
_inner_data.reset();
}

Expand Down Expand Up @@ -240,21 +243,31 @@ FileBufferBuilder& FileBufferBuilder::set_allocate_file_blocks_holder(
}

Status FileBufferBuilder::build(std::shared_ptr<FileBuffer>* buf) {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->s3_file_buffer_tracker());
auto mem_tracker = ExecEnv::GetInstance()->s3_file_buffer_tracker();
auto* thread_ctx = doris::thread_context(true);
if (thread_ctx != nullptr) {
// if thread local mem tracker is set, use it instead.
auto curr_tracker = thread_ctx->thread_mem_tracker_mgr->limiter_mem_tracker();
if (curr_tracker != ExecEnv::GetInstance()->orphan_mem_tracker()) {
mem_tracker = std::move(curr_tracker);
}
}
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker);
OperationState state(_sync_after_complete_task, _is_cancelled);

if (_type == BufferType::UPLOAD) {
RETURN_IF_CATCH_EXCEPTION(*buf = std::make_shared<UploadFileBuffer>(
std::move(_upload_cb), std::move(state), _offset,
std::move(_alloc_holder_cb)));
std::move(_alloc_holder_cb), std::move(mem_tracker)));
return Status::OK();
}
if (_type == BufferType::DOWNLOAD) {
RETURN_IF_CATCH_EXCEPTION(*buf = std::make_shared<DownloadFileBuffer>(
std::move(_download),
std::move(_write_to_local_file_cache),
std::move(_write_to_use_buffer), std::move(state),
_offset, std::move(_alloc_holder_cb)));
_offset, std::move(_alloc_holder_cb),
std::move(mem_tracker)));
return Status::OK();
}
// should never come here
Expand Down
14 changes: 9 additions & 5 deletions be/src/io/fs/s3_file_bufferpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#include "common/status.h"
#include "io/cache/file_block.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "util/crc32c.h"
#include "util/slice.h"
#include "util/threadpool.h"
Expand Down Expand Up @@ -77,7 +78,7 @@ struct OperationState {

struct FileBuffer {
FileBuffer(BufferType type, std::function<FileBlocksHolderPtr()> alloc_holder, size_t offset,
OperationState state);
OperationState state, std::shared_ptr<MemTrackerLimiter> mem_tracker);
virtual ~FileBuffer();
/**
* submit the correspoding task to async executor
Expand Down Expand Up @@ -127,14 +128,16 @@ struct FileBuffer {
struct PartData;
std::unique_ptr<PartData> _inner_data;
size_t _capacity;
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
};

struct DownloadFileBuffer final : public FileBuffer {
DownloadFileBuffer(std::function<Status(Slice&)> download,
std::function<void(FileBlocksHolderPtr, Slice)> write_to_cache,
std::function<void(Slice, size_t)> write_to_use_buffer, OperationState state,
size_t offset, std::function<FileBlocksHolderPtr()> alloc_holder)
: FileBuffer(BufferType::DOWNLOAD, alloc_holder, offset, state),
size_t offset, std::function<FileBlocksHolderPtr()> alloc_holder,
std::shared_ptr<MemTrackerLimiter> mem_tracker)
: FileBuffer(BufferType::DOWNLOAD, alloc_holder, offset, state, std::move(mem_tracker)),
_download(std::move(download)),
_write_to_local_file_cache(std::move(write_to_cache)),
_write_to_use_buffer(std::move(write_to_use_buffer)) {}
Expand All @@ -153,8 +156,9 @@ struct DownloadFileBuffer final : public FileBuffer {

struct UploadFileBuffer final : public FileBuffer {
UploadFileBuffer(std::function<void(UploadFileBuffer&)> upload_cb, OperationState state,
size_t offset, std::function<FileBlocksHolderPtr()> alloc_holder)
: FileBuffer(BufferType::UPLOAD, alloc_holder, offset, state),
size_t offset, std::function<FileBlocksHolderPtr()> alloc_holder,
std::shared_ptr<MemTrackerLimiter> mem_tracker)
: FileBuffer(BufferType::UPLOAD, alloc_holder, offset, state, std::move(mem_tracker)),
_upload_to_remote(std::move(upload_cb)) {}
~UploadFileBuffer() override = default;
Status append_data(const Slice& s) override;
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
#include "olap/txn_manager.h"
#include "olap/types.h"
#include "olap/utils.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "segment_loader.h"
#include "service/point_query_executor.h"
#include "tablet.h"
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include "olap/rowset/rowset_reader.h"
#include "olap/rowset/segment_v2/segment.h"
#include "olap/version_graph.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "segment_loader.h"
#include "util/metrics.h"
#include "util/once.h"
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/snapshot_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
#include "olap/tablet_manager.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "util/s3_uri.h"
#include "util/s3_util.h"
#include "util/thrift_rpc_helper.h"
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/snapshot_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#include "common/status.h"
#include "olap/tablet_fwd.h"
#include "runtime/memory/mem_tracker_limiter.h"

namespace doris {
namespace io {
Expand Down