Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,7 @@ void alter_inverted_index_callback(StorageEngine& engine, const TAgentTaskReques
auto tablet_ptr = engine.tablet_manager()->get_tablet(alter_inverted_index_rq.tablet_id);
if (tablet_ptr != nullptr) {
EngineIndexChangeTask engine_task(alter_inverted_index_rq);
SCOPED_ATTACH_TASK(engine_task.mem_tracker());
status = engine_task.execute();
} else {
status = Status::NotFound("could not find tablet {}", alter_inverted_index_rq.tablet_id);
Expand Down Expand Up @@ -857,6 +858,7 @@ void check_consistency_callback(StorageEngine& engine, const TAgentTaskRequest&
EngineChecksumTask engine_task(check_consistency_req.tablet_id,
check_consistency_req.schema_hash, check_consistency_req.version,
&checksum);
SCOPED_ATTACH_TASK(engine_task.mem_tracker());
Status status = engine_task.execute();
if (!status.ok()) {
LOG_WARNING("failed to check consistency")
Expand Down Expand Up @@ -1438,6 +1440,7 @@ void push_callback(const TAgentTaskRequest& req) {
std::vector<TTabletInfo> tablet_infos;

EngineBatchLoadTask engine_task(const_cast<TPushReq&>(push_req), &tablet_infos);
SCOPED_ATTACH_TASK(engine_task.mem_tracker());
auto status = engine_task.execute();

// Return result to fe
Expand Down Expand Up @@ -1695,6 +1698,7 @@ void clone_callback(StorageEngine& engine, const TMasterInfo& master_info,

std::vector<TTabletInfo> tablet_infos;
EngineCloneTask engine_task(clone_req, master_info, req.signature, &tablet_infos);
SCOPED_ATTACH_TASK(engine_task.mem_tracker());
auto status = engine_task.execute();
// Return result to fe
TFinishTaskRequest finish_task_request;
Expand Down
1 change: 0 additions & 1 deletion be/src/olap/task/engine_batch_load_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ EngineBatchLoadTask::EngineBatchLoadTask(TPushReq& push_req, std::vector<TTablet
EngineBatchLoadTask::~EngineBatchLoadTask() {}

Status EngineBatchLoadTask::execute() {
SCOPED_ATTACH_TASK(_mem_tracker);
Status status;
if (_push_req.push_type == TPushType::LOAD_V2) {
RETURN_IF_ERROR(_init());
Expand Down
2 changes: 0 additions & 2 deletions be/src/olap/task/engine_batch_load_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
#include "olap/task/engine_task.h"

namespace doris {
class MemTrackerLimiter;
class TPushReq;
class TTabletInfo;

Expand Down Expand Up @@ -71,7 +70,6 @@ class EngineBatchLoadTask final : public EngineTask {
std::vector<TTabletInfo>* _tablet_infos;
std::string _remote_file_path;
std::string _local_file_path;
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
}; // class EngineBatchLoadTask
} // namespace doris
#endif // DORIS_BE_SRC_OLAP_TASK_ENGINE_BATCH_LOAD_TASK_H
1 change: 0 additions & 1 deletion be/src/olap/task/engine_checksum_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ EngineChecksumTask::EngineChecksumTask(TTabletId tablet_id, TSchemaHash schema_h
}

Status EngineChecksumTask::execute() {
SCOPED_ATTACH_TASK(_mem_tracker);
return _compute_checksum();
} // execute

Expand Down
2 changes: 0 additions & 2 deletions be/src/olap/task/engine_checksum_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
#include "olap/task/engine_task.h"

namespace doris {
class MemTrackerLimiter;

// base class for storage engine
// add "Engine" as task prefix to prevent duplicate name with agent task
Expand All @@ -49,7 +48,6 @@ class EngineChecksumTask : public EngineTask {
TSchemaHash _schema_hash;
TVersion _version;
uint32_t* _checksum;
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
}; // EngineTask

} // namespace doris
Expand Down
1 change: 0 additions & 1 deletion be/src/olap/task/engine_clone_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ EngineCloneTask::EngineCloneTask(const TCloneReq& clone_req, const TMasterInfo&

Status EngineCloneTask::execute() {
// register the tablet to avoid it is deleted by gc thread during clone process
SCOPED_ATTACH_TASK(_mem_tracker);
if (!StorageEngine::instance()->tablet_manager()->register_clone_tablet(_clone_req.tablet_id)) {
return Status::InternalError("tablet {} is under clone", _clone_req.tablet_id);
}
Expand Down
2 changes: 0 additions & 2 deletions be/src/olap/task/engine_clone_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

namespace doris {
class DataDir;
class MemTrackerLimiter;
class TCloneReq;
class TMasterInfo;
class TTabletInfo;
Expand Down Expand Up @@ -95,7 +94,6 @@ class EngineCloneTask : public EngineTask {
const TMasterInfo& _master_info;
int64_t _copy_size;
int64_t _copy_time_ms;
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
std::vector<PendingRowsetGuard> _pending_rs_guards;
}; // EngineTask

Expand Down
4 changes: 1 addition & 3 deletions be/src/olap/task/engine_index_change_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

#include "olap/task/engine_index_change_task.h"

#include "runtime/memory/mem_tracker.h"
#include "runtime/thread_context.h"
#include "runtime/memory/mem_tracker_limiter.h"

namespace doris {

Expand All @@ -33,7 +32,6 @@ EngineIndexChangeTask::EngineIndexChangeTask(
}

Status EngineIndexChangeTask::execute() {
SCOPED_ATTACH_TASK(_mem_tracker);
DorisMetrics::instance()->alter_inverted_index_requests_total->increment(1);
uint64_t start = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
Expand Down
2 changes: 0 additions & 2 deletions be/src/olap/task/engine_index_change_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ class EngineIndexChangeTask final : public EngineTask {

private:
const TAlterInvertedIndexReq& _alter_inverted_index_req;

std::shared_ptr<MemTrackerLimiter> _mem_tracker;
}; // EngineTask

} // namespace doris
5 changes: 5 additions & 0 deletions be/src/olap/task/engine_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,17 @@

namespace doris {

class MemTrackerLimiter;

// base class for storage engine
// add "Engine" as task prefix to prevent duplicate name with agent task
class EngineTask {
public:
virtual ~EngineTask() = default;
virtual Status execute() = 0;
std::shared_ptr<MemTrackerLimiter> mem_tracker() const { return _mem_tracker; }

std::shared_ptr<MemTrackerLimiter> _mem_tracker;
};

} // end namespace doris
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/thread_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ class ThreadContext {
DCHECK(mem_tracker);
// Orphan is thread default tracker.
DCHECK(thread_mem_tracker()->label() == "Orphan")
<< ", thread mem tracker label: " << thread_mem_tracker()->label()
<< ", attach mem tracker label: " << mem_tracker->label();
#endif
_task_id = task_id;
Expand Down