Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -582,10 +582,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag
void PipelineFragmentContext::_init_next_report_time() {
auto interval_s = config::pipeline_status_report_interval;
if (_is_report_success && interval_s > 0 && _timeout > interval_s) {
std::vector<string> ins_ids;
instance_ids(ins_ids);
VLOG_FILE << "enable period report: instance_id="
<< fmt::format("{}", fmt::join(ins_ids, ", "));
VLOG_FILE << "enable period report: fragment id=" << _fragment_id;
uint64_t report_fragment_offset = (uint64_t)(rand() % interval_s) * NANOS_PER_SEC;
// We don't want to wait longer than it takes to run the entire fragment.
_previous_report_time =
Expand Down Expand Up @@ -623,11 +620,9 @@ void PipelineFragmentContext::trigger_report_if_necessary() {
return;
}
if (VLOG_FILE_IS_ON) {
std::vector<string> ins_ids;
instance_ids(ins_ids);
VLOG_FILE << "Reporting "
<< "profile for query_id " << print_id(_query_id)
<< ", instance ids: " << fmt::format("{}", fmt::join(ins_ids, ", "));
<< ", fragment id: " << _fragment_id;

std::stringstream ss;
_runtime_state->runtime_profile()->compute_time_in_profile();
Expand Down
14 changes: 0 additions & 14 deletions be/src/pipeline/pipeline_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,20 +117,6 @@ class PipelineFragmentContext : public TaskExecutionContext {

[[nodiscard]] int next_sink_operator_id() { return _sink_operator_id--; }

void instance_ids(std::vector<TUniqueId>& ins_ids) const {
ins_ids.resize(_fragment_instance_ids.size());
for (size_t i = 0; i < _fragment_instance_ids.size(); i++) {
ins_ids[i] = _fragment_instance_ids[i];
}
}

void instance_ids(std::vector<string>& ins_ids) const {
ins_ids.resize(_fragment_instance_ids.size());
for (size_t i = 0; i < _fragment_instance_ids.size(); i++) {
ins_ids[i] = print_id(_fragment_instance_ids[i]);
}
}

void clear_finished_tasks() {
for (size_t j = 0; j < _tasks.size(); j++) {
for (size_t i = 0; i < _tasks[j].size(); i++) {
Expand Down
68 changes: 16 additions & 52 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_instance_count, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(timeout_canceled_fragment_count, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_thread_pool_queue_size, MetricUnit::NOUNIT);
bvar::LatencyRecorder g_fragmentmgr_prepare_latency("doris_FragmentMgr", "prepare");
bvar::Adder<int64_t> g_pipeline_fragment_instances_count("doris_pipeline_fragment_instances_count");

bvar::Adder<uint64_t> g_fragment_executing_count("fragment_executing_count");
bvar::Status<uint64_t> g_fragment_last_active_time(
Expand Down Expand Up @@ -633,18 +632,13 @@ void FragmentMgr::remove_pipeline_context(
{
std::lock_guard<std::mutex> lock(_lock);
auto query_id = f_context->get_query_id();
std::vector<TUniqueId> ins_ids;
f_context->instance_ids(ins_ids);
int64 now = duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
g_fragment_executing_count << -1;
g_fragment_last_active_time.set_value(now);
for (const auto& ins_id : ins_ids) {
LOG_INFO("Removing query {} instance {}", print_id(query_id), print_id(ins_id));
_pipeline_map.erase(ins_id);
g_pipeline_fragment_instances_count << -1;
}
LOG_INFO("Removing query {} fragment {}", print_id(query_id), f_context->get_fragment_id());
_pipeline_map.erase({query_id, f_context->get_fragment_id()});
}
}

Expand Down Expand Up @@ -778,11 +772,10 @@ std::string FragmentMgr::dump_pipeline_tasks(int64_t duration) {
continue;
}
auto timeout_second = it.second->timeout_second();
fmt::format_to(debug_string_buffer,
"No.{} (elapse_second={}s, query_timeout_second={}s, instance_id="
"{}, is_timeout={}) : {}\n",
i, elapsed, timeout_second, print_id(it.first),
it.second->is_timeout(now), it.second->debug_string());
fmt::format_to(
debug_string_buffer,
"No.{} (elapse_second={}s, query_timeout_second={}s, is_timeout={}) : {}\n", i,
elapsed, timeout_second, it.second->is_timeout(now), it.second->debug_string());
i++;
}
}
Expand Down Expand Up @@ -842,11 +835,10 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
for (const auto& local_param : params.local_params) {
const TUniqueId& fragment_instance_id = local_param.fragment_instance_id;
std::lock_guard<std::mutex> lock(_lock);
auto iter = _pipeline_map.find(fragment_instance_id);
auto iter = _pipeline_map.find({params.query_id, params.fragment_id});
if (iter != _pipeline_map.end()) {
return Status::InternalError(
"exec_plan_fragment input duplicated fragment_instance_id({})",
UniqueId(fragment_instance_id).to_string());
return Status::InternalError("exec_plan_fragment input duplicated fragment_id({})",
params.fragment_id);
}
query_ctx->fragment_instance_ids.push_back(fragment_instance_id);
}
Expand All @@ -862,12 +854,8 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
g_fragment_executing_count << 1;
g_fragment_last_active_time.set_value(now);
std::lock_guard<std::mutex> lock(_lock);
std::vector<TUniqueId> ins_ids;
context->instance_ids(ins_ids);
// TODO: simplify this mapping
for (const auto& ins_id : ins_ids) {
_pipeline_map.insert({ins_id, context});
}
_pipeline_map.insert({{params.query_id, params.fragment_id}, context});
}
query_ctx->set_pipeline_context(params.fragment_id, context);

Expand Down Expand Up @@ -912,31 +900,6 @@ void FragmentMgr::cancel_query(const TUniqueId query_id, const Status reason) {
<< " is cancelled and removed. Reason: " << reason.to_string();
}

void FragmentMgr::cancel_instance(const TUniqueId instance_id, const Status reason) {
std::shared_ptr<pipeline::PipelineFragmentContext> pipeline_ctx;
{
std::lock_guard<std::mutex> state_lock(_lock);
DCHECK(!_pipeline_map.contains(instance_id))
<< " Pipeline tasks should be canceled by query instead of instance! Query ID: "
<< print_id(_pipeline_map[instance_id]->get_query_id());
const bool is_pipeline_instance = _pipeline_map.contains(instance_id);
if (is_pipeline_instance) {
auto itr = _pipeline_map.find(instance_id);
if (itr != _pipeline_map.end()) {
pipeline_ctx = itr->second;
} else {
LOG(WARNING) << "Could not find the pipeline instance id:" << print_id(instance_id)
<< " to cancel";
return;
}
}
}

if (pipeline_ctx != nullptr) {
pipeline_ctx->cancel(reason);
}
}

void FragmentMgr::cancel_worker() {
LOG(INFO) << "FragmentMgr cancel worker start working.";

Expand Down Expand Up @@ -1202,15 +1165,16 @@ Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request,

RuntimeFilterMgr* runtime_filter_mgr = nullptr;

const auto& fragment_instance_ids = request->fragment_instance_ids();
const auto& fragment_ids = request->fragment_ids();
{
std::unique_lock<std::mutex> lock(_lock);
for (UniqueId fragment_instance_id : fragment_instance_ids) {
TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift();

for (auto fragment_id : fragment_ids) {
if (is_pipeline) {
auto iter = _pipeline_map.find(tfragment_instance_id);
auto iter = _pipeline_map.find(
{UniqueId(request->query_id()).to_thrift(), fragment_id});
if (iter == _pipeline_map.end()) {
LOG(WARNING) << "No pipeline fragment is found: Query-ID = "
<< request->query_id() << " fragment_id = " << fragment_id;
continue;
}
pip_context = iter->second;
Expand Down
8 changes: 4 additions & 4 deletions be/src/runtime/fragment_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,6 @@ class FragmentMgr : public RestMonitorIface {
Status trigger_pipeline_context_report(const ReportStatusRequest,
std::shared_ptr<pipeline::PipelineFragmentContext>&&);

// Cancel instance (pipeline or nonpipeline).
void cancel_instance(const TUniqueId instance_id, const Status reason);

// Can be used in both version.
void cancel_query(const TUniqueId query_id, const Status reason);

Expand Down Expand Up @@ -169,7 +166,10 @@ class FragmentMgr : public RestMonitorIface {
// call _lock, so that there is dead lock.
std::mutex _lock;

std::unordered_map<TUniqueId, std::shared_ptr<pipeline::PipelineFragmentContext>> _pipeline_map;
// (QueryID, FragmentID) -> PipelineFragmentContext
std::unordered_map<std::pair<TUniqueId, int>,
std::shared_ptr<pipeline::PipelineFragmentContext>>
_pipeline_map;

// query id -> QueryContext
std::unordered_map<TUniqueId, std::weak_ptr<QueryContext>> _query_ctx_map;
Expand Down
2 changes: 0 additions & 2 deletions be/src/runtime/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,6 @@ class QueryContext {

ThreadPool* get_memtable_flush_pool();

std::vector<TUniqueId> get_fragment_instance_ids() const { return fragment_instance_ids; }

int64_t mem_limit() const { return _bytes_limit; }

void set_merge_controller_handler(
Expand Down
16 changes: 11 additions & 5 deletions be/src/runtime/runtime_filter_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,6 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
// so we need to copy to cnt_val
cnt_val->producer_size = producer_size;
cnt_val->runtime_filter_desc = *runtime_filter_desc;
cnt_val->target_info = *target_info;
cnt_val->pool.reset(new ObjectPool());
cnt_val->filter = cnt_val->pool->add(new IRuntimeFilter(_state, runtime_filter_desc));

Expand Down Expand Up @@ -458,10 +457,17 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
}
closure->cntl_->set_timeout_ms(std::min(3600, _state->execution_timeout) * 1000);
// set fragment-id
for (auto& target_fragment_instance_id : target.target_fragment_instance_ids) {
PUniqueId* cur_id = closure->request_->add_fragment_instance_ids();
cur_id->set_hi(target_fragment_instance_id.hi);
cur_id->set_lo(target_fragment_instance_id.lo);
if (target.__isset.target_fragment_ids) {
for (auto& target_fragment_id : target.target_fragment_ids) {
closure->request_->add_fragment_ids(target_fragment_id);
}
} else {
// FE not upgraded yet.
for (auto& target_fragment_instance_id : target.target_fragment_instance_ids) {
PUniqueId* cur_id = closure->request_->add_fragment_instance_ids();
cur_id->set_hi(target_fragment_instance_id.hi);
cur_id->set_lo(target_fragment_instance_id.lo);
}
}

std::shared_ptr<PBackendService_Stub> stub(
Expand Down
1 change: 0 additions & 1 deletion be/src/runtime/runtime_filter_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ class RuntimeFilterMergeControllerEntity {
int producer_size;
uint64_t global_size;
TRuntimeFilterDesc runtime_filter_desc;
std::vector<doris::TRuntimeFilterTargetParams> target_info;
std::vector<doris::TRuntimeFilterTargetParamsV2> targetv2_info;
IRuntimeFilter* filter = nullptr;
std::unordered_set<UniqueId> arrive_id;
Expand Down
7 changes: 0 additions & 7 deletions be/src/service/backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -657,13 +657,6 @@ Status BaseBackendService::start_plan_fragment_execution(
QuerySource::INTERNAL_FRONTEND);
}

void BaseBackendService::cancel_plan_fragment(TCancelPlanFragmentResult& return_val,
const TCancelPlanFragmentParams& params) {
LOG(INFO) << "cancel_plan_fragment(): instance_id=" << print_id(params.fragment_instance_id);
_exec_env->fragment_mgr()->cancel_instance(
params.fragment_instance_id, Status::InternalError("cancel message received from FE"));
}

void BaseBackendService::transmit_data(TTransmitDataResult& return_val,
const TTransmitDataParams& params) {
VLOG_ROW << "transmit_data(): instance_id=" << params.dest_fragment_instance_id
Expand Down
2 changes: 1 addition & 1 deletion be/src/service/backend_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class BaseBackendService : public BackendServiceIf {
const TExecPlanFragmentParams& params) override;

void cancel_plan_fragment(TCancelPlanFragmentResult& return_val,
const TCancelPlanFragmentParams& params) override;
const TCancelPlanFragmentParams& params) override {};

void transmit_data(TTransmitDataResult& return_val, const TTransmitDataParams& params) override;

Expand Down
24 changes: 15 additions & 9 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -2004,7 +2004,8 @@ private void assignRuntimeFilterAddr() throws Exception {
List<FRuntimeFilterTargetParam> targetFragments = ridToTargetParam.computeIfAbsent(rid,
k -> new ArrayList<>());
for (final FInstanceExecParam instance : params.instanceExecParams) {
targetFragments.add(new FRuntimeFilterTargetParam(instance.instanceId, toBrpcHost(instance.host)));
targetFragments.add(new FRuntimeFilterTargetParam(instance.fragment().getFragmentId().asInt(),
toBrpcHost(instance.host)));
}
}

Expand Down Expand Up @@ -3188,20 +3189,24 @@ Map<TNetworkAddress, TPipelineFragmentParams> toThrift(int backendNum) {
for (FRuntimeFilterTargetParam targetParam : fParams) {
if (targetParamsV2.containsKey(targetParam.targetFragmentInstanceAddr)) {
targetParamsV2.get(targetParam.targetFragmentInstanceAddr)
.target_fragment_instance_ids
.add(targetParam.targetFragmentInstanceId);
.target_fragment_ids
.add(targetParam.targetFragmentId);
} else {
targetParamsV2.put(targetParam.targetFragmentInstanceAddr,
new TRuntimeFilterTargetParamsV2());
targetParamsV2.get(targetParam.targetFragmentInstanceAddr)
.target_fragment_instance_addr
= targetParam.targetFragmentInstanceAddr;
targetParamsV2.get(targetParam.targetFragmentInstanceAddr)
.target_fragment_instance_ids
.target_fragment_ids
= new ArrayList<>();
targetParamsV2.get(targetParam.targetFragmentInstanceAddr)
.target_fragment_ids
.add(targetParam.targetFragmentId);
// `target_fragment_instance_ids` is a required field
targetParamsV2.get(targetParam.targetFragmentInstanceAddr)
.target_fragment_instance_ids
.add(targetParam.targetFragmentInstanceId);
= new ArrayList<>();
}
}

Expand All @@ -3210,7 +3215,8 @@ Map<TNetworkAddress, TPipelineFragmentParams> toThrift(int backendNum) {
} else {
List<TRuntimeFilterTargetParams> targetParams = Lists.newArrayList();
for (FRuntimeFilterTargetParam targetParam : fParams) {
targetParams.add(new TRuntimeFilterTargetParams(targetParam.targetFragmentInstanceId,
// Instance id make no sense if this runtime filter doesn't have remote targets.
targetParams.add(new TRuntimeFilterTargetParams(new TUniqueId(),
targetParam.targetFragmentInstanceAddr));
}
localParams.runtime_filter_params.putToRidToTargetParam(rf.getFilterId().asInt(),
Expand Down Expand Up @@ -3380,12 +3386,12 @@ private void updateProfileIfPresent(Consumer<SummaryProfile> profileAction) {

// Runtime filter target fragment instance param
static class FRuntimeFilterTargetParam {
public TUniqueId targetFragmentInstanceId;
public int targetFragmentId;

public TNetworkAddress targetFragmentInstanceAddr;

public FRuntimeFilterTargetParam(TUniqueId id, TNetworkAddress host) {
this.targetFragmentInstanceId = id;
public FRuntimeFilterTargetParam(int id, TNetworkAddress host) {
this.targetFragmentId = id;
this.targetFragmentInstanceAddr = host;
}
}
Expand Down
1 change: 1 addition & 0 deletions gensrc/proto/internal_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,7 @@ message PPublishFilterRequestV2 {
optional int64 merge_time = 9;
optional bool contain_null = 10;
optional bool ignored = 11;
repeated int32 fragment_ids = 12;
};

message PPublishFilterResponse {
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/PaloInternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ struct TRuntimeFilterTargetParamsV2 {
1: required list<Types.TUniqueId> target_fragment_instance_ids
// The address of the instance where the fragment is expected to run
2: required Types.TNetworkAddress target_fragment_instance_addr
3: optional list<i32> target_fragment_ids
}

struct TRuntimeFilterParams {
Expand Down