Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions be/src/pipeline/exec/result_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@

#include "result_sink_operator.h"

#include <fmt/format.h>
#include <sys/select.h>

#include <memory>
#include <utility>

#include "common/config.h"
#include "common/object_pool.h"
#include "exec/rowid_fetcher.h"
#include "pipeline/exec/operator.h"
#include "runtime/buffer_control_block.h"
Expand Down Expand Up @@ -193,9 +192,10 @@ Status ResultSinkLocalState::close(RuntimeState* state, Status exec_status) {
final_status = st;
}

LOG_INFO("Query {} result sink closed with status {} and has written {} rows",
print_id(state->query_id()), final_status.to_string_no_stack(),
_writer->get_written_rows());
VLOG_NOTICE << fmt::format(
"Query {} result sink closed with status {} and has written {} rows",
print_id(state->query_id()), final_status.to_string_no_stack(),
_writer->get_written_rows());
}

// close sender, this is normal path end
Expand Down
5 changes: 0 additions & 5 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,11 +252,6 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re

auto* fragment_context = this;

LOG_INFO("PipelineFragmentContext::prepare")
.tag("query_id", print_id(_query_id))
.tag("fragment_id", _fragment_id)
.tag("pthread_id", (uintptr_t)pthread_self());

if (request.query_options.__isset.is_report_success) {
fragment_context->set_is_report_success(request.query_options.is_report_success);
}
Expand Down
21 changes: 8 additions & 13 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -576,17 +576,19 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
}
}

// Stage 2. prepare finished. then get FE instruction to execute
Status FragmentMgr::start_query_execution(const PExecPlanFragmentStartRequest* request) {
TUniqueId query_id;
query_id.__set_hi(request->query_id().hi());
query_id.__set_lo(request->query_id().lo());
std::shared_ptr<QueryContext> q_ctx = nullptr;
{
std::lock_guard<std::mutex> lock(_lock);
TUniqueId query_id;
query_id.__set_hi(request->query_id().hi());
query_id.__set_lo(request->query_id().lo());
q_ctx = _get_or_erase_query_ctx(query_id);
}
if (q_ctx) {
q_ctx->set_ready_to_execute(Status::OK());
LOG_INFO("Query {} start execution", print_id(query_id));
} else {
return Status::InternalError(
"Failed to get query fragments context. Query may be "
Expand Down Expand Up @@ -659,6 +661,7 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo
return Status::OK();
}

// First time a fragment of a query arrived. print logs.
LOG(INFO) << "query_id: " << print_id(query_id) << ", coord_addr: " << params.coord
<< ", total fragment num on current host: " << params.fragment_num_on_host
<< ", fe process uuid: " << params.query_options.fe_process_uuid
Expand Down Expand Up @@ -687,7 +690,6 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo
}

_set_scan_concurrency(params, query_ctx.get());
const bool is_pipeline = std::is_same_v<TPipelineFragmentParams, Params>;

if (params.__isset.workload_groups && !params.workload_groups.empty()) {
uint64_t tg_id = params.workload_groups[0].id;
Expand All @@ -698,21 +700,14 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo
RETURN_IF_ERROR(query_ctx->set_workload_group(workload_group_ptr));
_exec_env->runtime_query_statistics_mgr()->set_workload_group_id(print_id(query_id),
tg_id);

LOG(INFO) << "Query/load id: " << print_id(query_ctx->query_id())
<< ", use workload group: " << workload_group_ptr->debug_string()
<< ", is pipeline: " << ((int)is_pipeline);
} else {
LOG(INFO) << "Query/load id: " << print_id(query_ctx->query_id())
<< " carried group info but can not find group in be";
LOG(WARNING) << "Query/load id: " << print_id(query_ctx->query_id())
<< "can't find its workload group " << tg_id;
}
}
// There is some logic in query ctx's dctor, we could not check if exists and delete the
// temp query ctx now. For example, the query id maybe removed from workload group's queryset.
_query_ctx_map.insert(std::make_pair(query_ctx->query_id(), query_ctx));
LOG(INFO) << "Register query/load memory tracker, query/load id: "
<< print_id(query_ctx->query_id())
<< " limit: " << PrettyPrinter::print(query_ctx->mem_limit(), TUnit::BYTES);
}
return Status::OK();
}
Expand Down
5 changes: 2 additions & 3 deletions be/src/runtime/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,9 @@
#include <exception>
#include <memory>
#include <mutex>
#include <sstream>
#include <utility>

#include "common/logging.h"
#include "olap/olap_common.h"
#include "pipeline/dependency.h"
#include "pipeline/pipeline_fragment_context.h"
#include "runtime/exec_env.h"
Expand Down Expand Up @@ -196,7 +194,8 @@ QueryContext::~QueryContext() {

_exec_env->spill_stream_mgr()->async_cleanup_query(_query_id);
DorisMetrics::instance()->query_ctx_cnt->increment(-1);
LOG_INFO("Query {} deconstructed, {}", print_id(this->_query_id), mem_tracker_msg);
// the only one msg shows query's end. any other msg should append to it if need.
LOG_INFO("Query {} deconstructed, mem_tracker: {}", print_id(this->_query_id), mem_tracker_msg);
}

void QueryContext::set_ready_to_execute(Status reason) {
Expand Down