Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,7 @@ void alter_inverted_index_callback(StorageEngine& engine, const TAgentTaskReques
auto tablet_ptr = engine.tablet_manager()->get_tablet(alter_inverted_index_rq.tablet_id);
if (tablet_ptr != nullptr) {
EngineIndexChangeTask engine_task(engine, alter_inverted_index_rq);
SCOPED_ATTACH_TASK(engine_task.mem_tracker());
status = engine_task.execute();
} else {
status = Status::NotFound("could not find tablet {}", alter_inverted_index_rq.tablet_id);
Expand Down Expand Up @@ -940,6 +941,7 @@ void check_consistency_callback(StorageEngine& engine, const TAgentTaskRequest&
EngineChecksumTask engine_task(engine, check_consistency_req.tablet_id,
check_consistency_req.schema_hash, check_consistency_req.version,
&checksum);
SCOPED_ATTACH_TASK(engine_task.mem_tracker());
Status status = engine_task.execute();
if (!status.ok()) {
LOG_WARNING("failed to check consistency")
Expand Down Expand Up @@ -1596,6 +1598,7 @@ void push_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
std::vector<TTabletInfo> tablet_infos;

EngineBatchLoadTask engine_task(engine, const_cast<TPushReq&>(push_req), &tablet_infos);
SCOPED_ATTACH_TASK(engine_task.mem_tracker());
auto status = engine_task.execute();

// Return result to fe
Expand Down Expand Up @@ -1928,6 +1931,7 @@ void clone_callback(StorageEngine& engine, const TMasterInfo& master_info,

std::vector<TTabletInfo> tablet_infos;
EngineCloneTask engine_task(engine, clone_req, master_info, req.signature, &tablet_infos);
SCOPED_ATTACH_TASK(engine_task.mem_tracker());
auto status = engine_task.execute();
// Return result to fe
TFinishTaskRequest finish_task_request;
Expand Down
1 change: 0 additions & 1 deletion be/src/olap/task/engine_batch_load_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ EngineBatchLoadTask::EngineBatchLoadTask(StorageEngine& engine, TPushReq& push_r
EngineBatchLoadTask::~EngineBatchLoadTask() = default;

Status EngineBatchLoadTask::execute() {
SCOPED_ATTACH_TASK(_mem_tracker);
Status status;
if (_push_req.push_type == TPushType::LOAD_V2) {
RETURN_IF_ERROR(_init());
Expand Down
2 changes: 0 additions & 2 deletions be/src/olap/task/engine_batch_load_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
#include "olap/task/engine_task.h"

namespace doris {
class MemTrackerLimiter;
class TPushReq;
class TTabletInfo;
class StorageEngine;
Expand Down Expand Up @@ -68,7 +67,6 @@ class EngineBatchLoadTask final : public EngineTask {
std::vector<TTabletInfo>* _tablet_infos;
std::string _remote_file_path;
std::string _local_file_path;
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
}; // class EngineBatchLoadTask
} // namespace doris
#endif // DORIS_BE_SRC_OLAP_TASK_ENGINE_BATCH_LOAD_TASK_H
10 changes: 5 additions & 5 deletions be/src/olap/task/engine_checksum_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ EngineChecksumTask::EngineChecksumTask(StorageEngine& engine, TTabletId tablet_i
_tablet_id(tablet_id),
_schema_hash(schema_hash),
_version(version),
_checksum(checksum),
_mem_tracker(std::make_shared<MemTrackerLimiter>(
MemTrackerLimiter::Type::LOAD,
"EngineChecksumTask#tabletId=" + std::to_string(tablet_id))) {}
_checksum(checksum) {
_mem_tracker = std::make_shared<MemTrackerLimiter>(
MemTrackerLimiter::Type::LOAD,
"EngineChecksumTask#tabletId=" + std::to_string(tablet_id));
}

EngineChecksumTask::~EngineChecksumTask() = default;

Status EngineChecksumTask::execute() {
SCOPED_ATTACH_TASK(_mem_tracker);
return _compute_checksum();
} // execute

Expand Down
2 changes: 0 additions & 2 deletions be/src/olap/task/engine_checksum_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
#include "olap/task/engine_task.h"

namespace doris {
class MemTrackerLimiter;
class StorageEngine;

// base class for storage engine
Expand All @@ -48,7 +47,6 @@ class EngineChecksumTask final : public EngineTask {
TSchemaHash _schema_hash;
TVersion _version;
uint32_t* _checksum;
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
}; // EngineTask

} // namespace doris
Expand Down
1 change: 0 additions & 1 deletion be/src/olap/task/engine_clone_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ EngineCloneTask::EngineCloneTask(StorageEngine& engine, const TCloneReq& clone_r

Status EngineCloneTask::execute() {
// register the tablet to avoid it is deleted by gc thread during clone process
SCOPED_ATTACH_TASK(_mem_tracker);
if (!_engine.tablet_manager()->register_clone_tablet(_clone_req.tablet_id)) {
return Status::InternalError("tablet {} is under clone", _clone_req.tablet_id);
}
Expand Down
2 changes: 0 additions & 2 deletions be/src/olap/task/engine_clone_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

namespace doris {
class DataDir;
class MemTrackerLimiter;
class TCloneReq;
class TMasterInfo;
class TTabletInfo;
Expand Down Expand Up @@ -95,7 +94,6 @@ class EngineCloneTask final : public EngineTask {
const TMasterInfo& _master_info;
int64_t _copy_size;
int64_t _copy_time_ms;
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
std::vector<PendingRowsetGuard> _pending_rs_guards;
}; // EngineTask

Expand Down
17 changes: 8 additions & 9 deletions be/src/olap/task/engine_index_change_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,25 @@
#include "olap/task/engine_index_change_task.h"

#include "olap/storage_engine.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/thread_context.h"
#include "util/doris_metrics.h"

namespace doris {

EngineIndexChangeTask::EngineIndexChangeTask(
StorageEngine& engine, const TAlterInvertedIndexReq& alter_inverted_index_request)
: _engine(engine),
_alter_inverted_index_req(alter_inverted_index_request),
_mem_tracker(std::make_shared<MemTrackerLimiter>(
MemTrackerLimiter::Type::SCHEMA_CHANGE,
fmt::format("EngineIndexChangeTask#tabletId={}",
std::to_string(_alter_inverted_index_req.tablet_id)),
config::memory_limitation_per_thread_for_schema_change_bytes)) {}
: _engine(engine), _alter_inverted_index_req(alter_inverted_index_request) {
_mem_tracker = std::make_shared<MemTrackerLimiter>(
MemTrackerLimiter::Type::SCHEMA_CHANGE,
fmt::format("EngineIndexChangeTask#tabletId={}",
std::to_string(_alter_inverted_index_req.tablet_id)),
config::memory_limitation_per_thread_for_schema_change_bytes);
}

EngineIndexChangeTask::~EngineIndexChangeTask() = default;

Status EngineIndexChangeTask::execute() {
SCOPED_ATTACH_TASK(_mem_tracker);
DorisMetrics::instance()->alter_inverted_index_requests_total->increment(1);
uint64_t start = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
Expand Down
3 changes: 0 additions & 3 deletions be/src/olap/task/engine_index_change_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
namespace doris {
class StorageEngine;
class TAlterInvertedIndexReq;
class MemTrackerLimiter;

// base class for storage engine
// add "Engine" as task prefix to prevent duplicate name with agent task
Expand All @@ -37,8 +36,6 @@ class EngineIndexChangeTask final : public EngineTask {
private:
StorageEngine& _engine;
const TAlterInvertedIndexReq& _alter_inverted_index_req;

std::shared_ptr<MemTrackerLimiter> _mem_tracker;
}; // EngineTask

} // namespace doris
5 changes: 5 additions & 0 deletions be/src/olap/task/engine_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,17 @@

namespace doris {

class MemTrackerLimiter;

// base class for storage engine
// add "Engine" as task prefix to prevent duplicate name with agent task
class EngineTask {
public:
virtual ~EngineTask() = default;
virtual Status execute() = 0;
std::shared_ptr<MemTrackerLimiter> mem_tracker() const { return _mem_tracker; }

std::shared_ptr<MemTrackerLimiter> _mem_tracker;
};

} // end namespace doris
1 change: 1 addition & 0 deletions be/src/runtime/thread_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ class ThreadContext {
DCHECK(mem_tracker);
// Orphan is thread default tracker.
DCHECK(thread_mem_tracker()->label() == "Orphan")
<< ", thread mem tracker label: " << thread_mem_tracker()->label()
<< ", attach mem tracker label: " << mem_tracker->label();
#endif
_task_id = task_id;
Expand Down