Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ void alter_tablet(StorageEngine& engine, const TAgentTaskRequest& agent_task_req
new_tablet_id = agent_task_req.alter_tablet_req_v2.new_tablet_id;
new_schema_hash = agent_task_req.alter_tablet_req_v2.new_schema_hash;
auto mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::SCHEMA_CHANGE,
MemTrackerLimiter::Type::OTHER,
fmt::format("EngineAlterTabletTask#baseTabletId={}:newTabletId={}",
std::to_string(agent_task_req.alter_tablet_req_v2.base_tablet_id),
std::to_string(agent_task_req.alter_tablet_req_v2.new_tablet_id),
Expand Down Expand Up @@ -249,7 +249,7 @@ void alter_cloud_tablet(CloudStorageEngine& engine, const TAgentTaskRequest& age
if (status.ok()) {
new_tablet_id = agent_task_req.alter_tablet_req_v2.new_tablet_id;
auto mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::SCHEMA_CHANGE,
MemTrackerLimiter::Type::OTHER,
fmt::format("EngineAlterTabletTask#baseTabletId={}:newTabletId={}",
std::to_string(agent_task_req.alter_tablet_req_v2.base_tablet_id),
std::to_string(agent_task_req.alter_tablet_req_v2.new_tablet_id),
Expand Down
4 changes: 2 additions & 2 deletions be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ CloudEngineCalcDeleteBitmapTask::CloudEngineCalcDeleteBitmapTask(
_cal_delete_bitmap_req(cal_delete_bitmap_req),
_error_tablet_ids(error_tablet_ids),
_succ_tablet_ids(succ_tablet_ids) {
_mem_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::SCHEMA_CHANGE,
_mem_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER,
"CloudEngineCalcDeleteBitmapTask");
}

Expand Down Expand Up @@ -134,7 +134,7 @@ CloudTabletCalcDeleteBitmapTask::CloudTabletCalcDeleteBitmapTask(
_transaction_id(transaction_id),
_version(version) {
_mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::SCHEMA_CHANGE,
MemTrackerLimiter::Type::OTHER,
fmt::format("CloudTabletCalcDeleteBitmapTask#_transaction_id={}", _transaction_id));
}

Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/task/engine_publish_version_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ EnginePublishVersionTask::EnginePublishVersionTask(
_succ_tablets(succ_tablets),
_discontinuous_version_tablets(discontinuous_version_tablets),
_table_id_to_num_delta_rows(table_id_to_num_delta_rows) {
_mem_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::SCHEMA_CHANGE,
_mem_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER,
"TabletPublishTxnTask");
}

Expand Down Expand Up @@ -357,7 +357,7 @@ TabletPublishTxnTask::TabletPublishTxnTask(StorageEngine& engine,
_transaction_id(transaction_id),
_version(version),
_tablet_info(tablet_info),
_mem_tracker(MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::SCHEMA_CHANGE,
_mem_tracker(MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER,
"TabletPublishTxnTask")) {
_stats.submit_time_us = MonotonicMicros();
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/task/engine_publish_version_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class AsyncTabletPublishTask {
_partition_id(partition_id),
_transaction_id(transaction_id),
_version(version),
_mem_tracker(MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::SCHEMA_CHANGE,
_mem_tracker(MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER,
"AsyncTabletPublishTask")) {
_stats.submit_time_us = MonotonicMicros();
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/fold_constant_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ Status FoldConstantExecutor::_init(const TQueryGlobals& query_globals,
fragment_params.params = params;
fragment_params.protocol_version = PaloInternalServiceVersion::V1;
_mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::SCHEMA_CHANGE,
MemTrackerLimiter::Type::OTHER,
fmt::format("FoldConstant:query_id={}", print_id(_query_id)));
_runtime_state =
RuntimeState::create_unique(fragment_params.params, query_options, query_globals,
Expand Down
6 changes: 5 additions & 1 deletion be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ std::string FragmentMgr::to_http_path(const std::string& file_name) {
Status FragmentMgr::trigger_pipeline_context_report(
const ReportStatusRequest req, std::shared_ptr<pipeline::PipelineFragmentContext>&& ctx) {
return _async_report_thread_pool->submit_func([this, req, ctx]() {
SCOPED_ATTACH_TASK(ctx->get_query_ctx()->query_mem_tracker);
coordinator_callback(req);
if (!req.done) {
ctx->refresh_next_report_time();
Expand Down Expand Up @@ -982,7 +983,10 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,

for (size_t i = 0; i < target_size; i++) {
RETURN_IF_ERROR(_thread_pool->submit_func([&, i]() {
prepare_status[i] = pre_and_submit(i);
{
SCOPED_ATTACH_TASK_WITH_ID(query_ctx->query_mem_tracker, params.query_id);
prepare_status[i] = pre_and_submit(i);
}
std::unique_lock<std::mutex> lock(m);
prepare_done++;
if (prepare_done == target_size) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/service/backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg* arg) {
const auto& local_tablet_uid = local_tablet->tablet_uid();

std::shared_ptr<MemTrackerLimiter> mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::SCHEMA_CHANGE, fmt::format("IngestBinlog#TxnId={}", txn_id));
MemTrackerLimiter::Type::OTHER, fmt::format("IngestBinlog#TxnId={}", txn_id));
SCOPED_ATTACH_TASK(mem_tracker);

auto& request = arg->request;
Expand Down
2 changes: 1 addition & 1 deletion be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,7 @@ void PInternalService::fetch_table_schema(google::protobuf::RpcController* contr
const TFileScanRangeParams& params = file_scan_range.params;

std::shared_ptr<MemTrackerLimiter> mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::SCHEMA_CHANGE,
MemTrackerLimiter::Type::OTHER,
fmt::format("{}#{}", params.format_type, params.file_type));
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker);

Expand Down