From 3a5d6a401518dab4f280cc0874158641fd2e4671 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 25 Sep 2025 20:33:05 +0800 Subject: [PATCH] [fix](debug) Fix illegal access to runtime states (#56439) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What problem does this PR solve? introduced by #56405 2 changes here: 1. print local runtime filter mgr info inside each task 2. make a pair for tasks with its corresponding runtime state. ==198869==ERROR: AddressSanitizer: heap-buffer-overflow on address 0x7ba3927ac338 at pc 0x559d81e109b9 bp 0x7b7a5b8ad410 sp 0x7b7a5b8ad408 12:38:06  READ of size 8 at 0x7ba3927ac338 thread T1845 (EvHttpServer [w) 12:38:06  #0 0x559d81e109b8 in std::__uniq_ptr_impl >::_M_ptr() const /usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/unique_ptr.h:193:51 12:38:06  #1 0x559d81e109b8 in std::unique_ptr >::get() const /usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/unique_ptr.h:473:21 12:38:06  #2 0x559d81e109b8 in std::unique_ptr >::operator->() const /usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/unique_ptr.h:466:9 12:38:06  #3 0x559d81e109b8 in doris::pipeline::PipelineFragmentContext::debug_string[abi:cxx11]() /root/doris/be/src/pipeline/pipeline_fragment_context.cpp:1941:28 12:38:06  #4 0x559d6d65aa2f in doris::FragmentMgr::dump_pipeline_tasks[abi:cxx11](long)::$_0::operator()(phmap::flat_hash_map, std::shared_ptr, phmap::Hash >, phmap::EqualTo >, std::allocator const, std::shared_ptr > > >&) const /root/doris/be/src/runtime/fragment_mgr.cpp:807:36 12:38:06  + echo 'cp -r /mnt/ssd01/pipline/OpenSourceDoris/clusterEnv/P0//Cluster0/fe/bin /home/work/pipline/backup_center/56368_04d746711764283b1cbafe40b1f849e10a5bcb6e_p0/fe/' 12:38:06  #5 0x559d6d65aa2f in doris::Status std::__invoke_impl, std::shared_ptr, phmap::Hash >, phmap::EqualTo >, std::allocator const, std::shared_ptr > > >&>(std::__invoke_other, doris::FragmentMgr::dump_pipeline_tasks[abi:cxx11](long)::$_0&, phmap::flat_hash_map, std::shared_ptr, phmap::Hash >, phmap::EqualTo >, std::allocator const, std::shared_ptr > > >&) /usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/invoke.h:63:14 12:38:06  + cp -r /mnt/ssd01/pipline/OpenSourceDoris/clusterEnv/P0//Cluster0/fe/conf /home/work/pipline/backup_center/56368_04d746711764283b1cbafe40b1f849e10a5bcb6e_p0/fe/ 12:38:06  #6 0x559d6d65aa2f in std::enable_if, std::shared_ptr, phmap::Hash >, phmap::EqualTo >, std::allocator const, std::shared_ptr > > >&>, doris::Status>::type std::__invoke_r, std::shared_ptr, phmap::Hash >, phmap::EqualTo >, std::allocator const, std::shared_ptr > > >&>(doris::FragmentMgr::dump_pipeline_tasks[abi:cxx11](long)::$_0&, phmap::flat_hash_map, std::shared_ptr, phmap::Hash >, phmap::EqualTo >, std::allocator const, std::shared_ptr > > >&) /usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/invoke.h:116:9 12:38:06  #7 0x559d6d65aa2f in std::_Function_handler, std::shared_ptr, phmap::Hash >, phmap::EqualTo >, std::allocator const, std::shared_ptr > > >&), doris::FragmentMgr::dump_pipeline_tasks[abi:cxx11](long)::$_0>::_M_invoke(std::_Any_data const&, phmap::flat_hash_map, std::shared_ptr, phmap::Hash >, phmap::EqualTo >, std::allocator const, std::shared_ptr > > >&) /usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/std_function.h:292:9 12:38:06  #8 0x559d6d665d37 in std::function, std::shared_ptr, phmap::Hash >, phmap::EqualTo >, std::allocator const, std::shared_ptr > > >&)>::operator()(phmap::flat_hash_map, std::shared_ptr, phmap::Hash >, phmap::EqualTo >, std::allocator const, std::shared_ptr > > >&) const /usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/std_function.h:593:9 12:38:06  #9 0x559d6d665d37 in doris::ConcurrentContextMap, std::shared_ptr, doris::pipeline::PipelineFragmentContext>::apply(std::function, std::shared_ptr, phmap::Hash >, phmap::EqualTo >, std::allocator const, std::shared_ptr > > >&)>&&) /root/doris/be/src/runtime/fragment_mgr.h:94:31 12:38:06  #10 0x559d6d647f93 in doris::FragmentMgr::dump_pipeline_tasks[abi:cxx11](long) /root/doris/be/src/runtime/fragment_mgr.cpp:792:23 12:38:06  #11 0x559d7068942b in doris::LongPipelineTaskAction::handle(doris::HttpRequest*) /root/doris/be/src/http/action/pipeline_task_action.cpp:54:69 12:38:06  #12 0x559d82c776c6 (/mnt/ssd01/pipline/OpenSourceDoris/clusterEnv/P0/Cluster0/be/lib/doris_be+0x385416c6) 12:38:06  #13 0x559d82c5775f in bufferevent_run_readcb_ /home/runner/work/doris-thirdparty/doris-thirdparty/thirdparty/src/libevent-release-2.1.12-stable/bufferevent.c:251:3 12:38:06  #14 0x559d82c79802 in bufferevent_trigger_nolock_ /home/runner/work/doris-thirdparty/doris-thirdparty/thirdparty/src/libevent-release-2.1.12-stable/bufferevent-internal.h:411:3 12:38:06  #15 0x559d82c79802 in bufferevent_readcb /home/runner/work/doris-thirdparty/doris-thirdparty/thirdparty/src/libevent-release-2.1.12-stable/bufferevent_sock.c:214:2 12:38:06  #16 0x559d82c60724 in event_persist_closure /home/runner/work/doris-thirdparty/doris-thirdparty/thirdparty/src/libevent-release-2.1.12-stable/event.c:1623:9 12:38:06  #17 0x559d82c60724 in event_process_active_single_queue /home/runner/work/doris-thirdparty/doris-thirdparty/thirdparty/src/libevent-release-2.1.12-stable/event.c:1682:4 12:38:06  #18 0x559d82c60d76 in event_process_active /home/runner/work/doris-thirdparty/doris-thirdparty/thirdparty/src/libevent-release-2.1.12-stable/event.c:1783:9 12:38:06  + echo 'cp -r /mnt/ssd01/pipline/OpenSourceDoris/clusterEnv/P0//Cluster0/fe/log /home/work/pipline/backup_center/56368_04d746711764283b1cbafe40b1f849e10a5bcb6e_p0/fe/' 12:38:06  #19 0x559d82c6355f in event_base_loop.constprop.0 /home/runner/work/doris-thirdparty/doris-thirdparty/thirdparty/src/libevent-release-2.1.12-stable/event.c:2006:12 12:38:06  #20 0x559d7072b08c in doris::EvHttpServer::start()::$_0::operator()() const /root/doris/be/src/http/ev_http_server.cpp:139:13 12:38:06  + cp -r /mnt/ssd01/pipline/OpenSourceDoris/clusterEnv/P0//Cluster0/fe/log /home/work/pipline/backup_center/56368_04d746711764283b1cbafe40b1f849e10a5bcb6e_p0/fe/ 12:38:06  #21 0x559d7072b08c in void std::__invoke_impl(std::__invoke_other, doris::EvHttpServer::start()::$_0&) /usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/invoke.h:63:14 12:38:06  #22 0x559d7072b08c in std::enable_if, void>::type std::__invoke_r(doris::EvHttpServer::start()::$_0&) /usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/invoke.h:113:2 12:38:06  #23 0x559d7072b08c in std::_Function_handler::_M_invoke(std::_Any_data const&) /usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/std_function.h:292:9 12:38:06  #24 0x559d6e121772 in doris::ThreadPool::dispatch_thread() /root/doris/be/src/util/threadpool.cpp:614:24 12:38:06  #25 0x559d6e0fe7e6 in std::function::operator()() const /usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/std_function.h:593:9 12:38:06  #26 0x559d6e0fe7e6 in doris::Thread::supervise_thread(void*) /root/doris/be/src/util/thread.cpp:460:5 12:38:06  #27 0x559d6984ed26 in asan_thread_start(void*) (/mnt/ssd01/pipline/OpenSourceDoris/clusterEnv/P0/Cluster0/be/lib/doris_be+0x1f118d26) 12:38:06  #28 0x7f83909ee608 in start_thread /build/glibc-SzIz7B/glibc-2.31/nptl/pthread_create.c:477:8 12:38:06  #29 0x7f8390901132 in __clone /build/glibc-SzIz7B/glibc-2.31/misc/../sysdeps/unix/sysv/linux/x86_64/clone.S:95 ### Check List (For Author) - Test - [ ] Regression test - [ ] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [ ] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason - Behavior changed: - [ ] No. - [ ] Yes. - Does this need documentation? - [ ] No. - [ ] Yes. ### Check List (For Reviewer who merge this PR) - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label --- be/src/pipeline/pipeline_fragment_context.cpp | 76 +++++++------------ be/src/pipeline/pipeline_fragment_context.h | 38 +++++----- be/src/pipeline/pipeline_task.cpp | 9 ++- 3 files changed, 56 insertions(+), 67 deletions(-) diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 05f698ffeba6ff..ec048d0db51ab8 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -144,12 +144,7 @@ PipelineFragmentContext::~PipelineFragmentContext() { auto st = _query_ctx->exec_status(); for (size_t i = 0; i < _tasks.size(); i++) { if (!_tasks[i].empty()) { - _call_back(_tasks[i].front()->runtime_state(), &st); - } - } - for (auto& runtime_states : _task_runtime_states) { - for (auto& runtime_state : runtime_states) { - runtime_state.reset(); + _call_back(_tasks[i].front().first->runtime_state(), &st); } } _tasks.clear(); @@ -234,7 +229,7 @@ void PipelineFragmentContext::cancel(const Status reason) { for (auto& tasks : _tasks) { for (auto& task : tasks) { - task->terminate(); + task.first->terminate(); } } } @@ -379,9 +374,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) { const auto target_size = _params.local_params.size(); _tasks.resize(target_size); _runtime_filter_mgr_map.resize(target_size); - _task_runtime_states.resize(_pipelines.size()); for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) { - _task_runtime_states[pip_idx].resize(_pipelines[pip_idx]->num_tasks()); _pip_id_to_pipeline[_pipelines[pip_idx]->id()] = _pipelines[pip_idx].get(); } auto pipeline_id_to_profile = _runtime_state->build_pipeline_profile(_pipelines.size()); @@ -416,14 +409,10 @@ Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) { for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) { auto& pipeline = _pipelines[pip_idx]; if (pipeline->num_tasks() > 1 || i == 0) { - DCHECK(_task_runtime_states[pip_idx][i] == nullptr) - << print_id(_task_runtime_states[pip_idx][i]->fragment_instance_id()) << " " - << pipeline->debug_string(); - _task_runtime_states[pip_idx][i] = RuntimeState::create_unique( + auto task_runtime_state = RuntimeState::create_unique( local_params.fragment_instance_id, _params.query_id, _params.fragment_id, _params.query_options, _query_ctx->query_globals, _exec_env, _query_ctx.get()); - auto& task_runtime_state = _task_runtime_states[pip_idx][i]; { // Initialize runtime state for this task task_runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker()); @@ -470,7 +459,9 @@ Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) { pipeline_id_to_profile[pip_idx].get(), get_shared_state(pipeline), i); pipeline->incr_created_tasks(i, task.get()); pipeline_id_to_task.insert({pipeline->id(), task.get()}); - _tasks[i].emplace_back(std::move(task)); + _tasks[i].emplace_back( + std::pair, std::unique_ptr> { + std::move(task), std::move(task_runtime_state)}); } } @@ -1703,7 +1694,7 @@ Status PipelineFragmentContext::submit() { auto* scheduler = _query_ctx->get_pipe_exec_scheduler(); for (auto& task : _tasks) { for (auto& t : task) { - st = scheduler->submit(t); + st = scheduler->submit(t.first); DBUG_EXECUTE_IF("PipelineFragmentContext.submit.failed", { st = Status::Aborted("PipelineFragmentContext.submit.failed"); }); if (!st) { @@ -1810,12 +1801,9 @@ std::string PipelineFragmentContext::get_load_error_url() { if (const auto& str = _runtime_state->get_error_log_file_path(); !str.empty()) { return to_load_error_http_path(str); } - for (auto& task_states : _task_runtime_states) { - for (auto& task_state : task_states) { - if (!task_state) { - continue; - } - if (const auto& str = task_state->get_error_log_file_path(); !str.empty()) { + for (auto& tasks : _tasks) { + for (auto& task : tasks) { + if (const auto& str = task.second->get_error_log_file_path(); !str.empty()) { return to_load_error_http_path(str); } } @@ -1827,12 +1815,9 @@ std::string PipelineFragmentContext::get_first_error_msg() { if (const auto& str = _runtime_state->get_first_error_msg(); !str.empty()) { return str; } - for (auto& task_states : _task_runtime_states) { - for (auto& task_state : task_states) { - if (!task_state) { - continue; - } - if (const auto& str = task_state->get_first_error_msg(); !str.empty()) { + for (auto& tasks : _tasks) { + for (auto& task : tasks) { + if (const auto& str = task.second->get_first_error_msg(); !str.empty()) { return str; } } @@ -1862,11 +1847,9 @@ Status PipelineFragmentContext::send_report(bool done) { std::vector runtime_states; - for (auto& task_states : _task_runtime_states) { - for (auto& task_state : task_states) { - if (task_state) { - runtime_states.push_back(task_state.get()); - } + for (auto& tasks : _tasks) { + for (auto& task : tasks) { + runtime_states.push_back(task.second.get()); } } @@ -1900,15 +1883,15 @@ size_t PipelineFragmentContext::get_revocable_size(bool* has_running_task) const // here to traverse the vector. for (const auto& task_instances : _tasks) { for (const auto& task : task_instances) { - if (task->is_running()) { + if (task.first->is_running()) { LOG_EVERY_N(INFO, 50) << "Query: " << print_id(_query_id) - << " is running, task: " << (void*)task.get() - << ", is_running: " << task->is_running(); + << " is running, task: " << (void*)task.first.get() + << ", is_running: " << task.first->is_running(); *has_running_task = true; return 0; } - size_t revocable_size = task->get_revocable_size(); + size_t revocable_size = task.first->get_revocable_size(); if (revocable_size >= vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) { res += revocable_size; } @@ -1921,9 +1904,9 @@ std::vector PipelineFragmentContext::get_revocable_tasks() const std::vector revocable_tasks; for (const auto& task_instances : _tasks) { for (const auto& task : task_instances) { - size_t revocable_size_ = task->get_revocable_size(); + size_t revocable_size_ = task.first->get_revocable_size(); if (revocable_size_ >= vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) { - revocable_tasks.emplace_back(task.get()); + revocable_tasks.emplace_back(task.first.get()); } } } @@ -1936,9 +1919,8 @@ std::string PipelineFragmentContext::debug_string() { for (size_t j = 0; j < _tasks.size(); j++) { fmt::format_to(debug_string_buffer, "Tasks in instance {}:\n", j); for (size_t i = 0; i < _tasks[j].size(); i++) { - fmt::format_to(debug_string_buffer, "Task {}: {}\n{}\n", i, - _tasks[j][i]->debug_string(), - _task_runtime_states[i][j]->local_runtime_filter_mgr()->debug_string()); + fmt::format_to(debug_string_buffer, "Task {}: {}\n", i, + _tasks[j][i].first->debug_string()); } } @@ -1988,16 +1970,16 @@ PipelineFragmentContext::collect_realtime_load_channel_profile() const { return nullptr; } - for (const auto& runtime_states : _task_runtime_states) { - for (const auto& runtime_state : runtime_states) { - if (runtime_state == nullptr || runtime_state->runtime_profile() == nullptr) { + for (const auto& tasks : _tasks) { + for (const auto& task : tasks) { + if (task.second->runtime_profile() == nullptr) { continue; } auto tmp_load_channel_profile = std::make_shared(); - runtime_state->runtime_profile()->to_thrift(tmp_load_channel_profile.get(), - _runtime_state->profile_level()); + task.second->runtime_profile()->to_thrift(tmp_load_channel_profile.get(), + _runtime_state->profile_level()); _runtime_state->load_channel_profile()->update(*tmp_load_channel_profile); } } diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index f3ee112b0a049e..81b3f57b01fbf8 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -117,7 +117,7 @@ class PipelineFragmentContext : public TaskExecutionContext { void clear_finished_tasks() { for (size_t j = 0; j < _tasks.size(); j++) { for (size_t i = 0; i < _tasks[j].size(); i++) { - _tasks[j][i]->stop_if_finished(); + _tasks[j][i].first->stop_if_finished(); } } } @@ -228,8 +228,25 @@ class PipelineFragmentContext : public TaskExecutionContext { bool _use_serial_source = false; OperatorPtr _root_op = nullptr; - // this is a [n * m] matrix. n is parallelism of pipeline engine and m is the number of pipelines. - std::vector>> _tasks; + // + /** + * Matrix stores tasks with local runtime states. + * This is a [n * m] matrix. n is parallelism of pipeline engine and m is the number of pipelines. + * + * 2-D matrix: + * +-------------------------+------------+-------+ + * | | Pipeline 0 | Pipeline 1 | ... | + * +------------+------------+------------+-------+ + * | Instance 0 | task 0-0 | task 0-1 | ... | + * +------------+------------+------------+-------+ + * | Instance 1 | task 1-0 | task 1-1 | ... | + * +------------+------------+------------+-------+ + * | ... | + * +--------------------------------------+-------+ + */ + std::vector< + std::vector, std::unique_ptr>>> + _tasks; // TODO: remove the _sink and _multi_cast_stream_sink_senders to set both // of it in pipeline task not the fragment_context @@ -299,21 +316,6 @@ class PipelineFragmentContext : public TaskExecutionContext { // - _task_runtime_states is at the task level, unique to each task. std::vector _fragment_instance_ids; - /** - * Local runtime states for each task. - * - * 2-D matrix: - * +-------------------------+------------+-------+ - * | | Instance 0 | Instance 1 | ... | - * +------------+------------+------------+-------+ - * | Pipeline 0 | task 0-0 | task 0-1 | ... | - * +------------+------------+------------+-------+ - * | Pipeline 1 | task 1-0 | task 1-1 | ... | - * +------------+------------+------------+-------+ - * | ... | - * +--------------------------------------+-------+ - */ - std::vector>> _task_runtime_states; // Total instance num running on all BEs int _total_instances = -1; diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 2da386e83a1c16..e069cb840d1ca3 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -735,10 +735,15 @@ std::string PipelineTask::debug_string() { return fmt::to_string(debug_string_buffer); } auto elapsed = fragment->elapsed_time() / NANOS_PER_SEC; - fmt::format_to(debug_string_buffer, - " elapse time = {}s, block dependency = [{}]\noperators: ", elapsed, + fmt::format_to(debug_string_buffer, " elapse time = {}s, block dependency = [{}]\n", elapsed, cur_blocked_dep && !is_finalized() ? cur_blocked_dep->debug_string() : "NULL"); + if (_state && _state->local_runtime_filter_mgr()) { + fmt::format_to(debug_string_buffer, "local_runtime_filter_mgr: [{}]\n", + _state->local_runtime_filter_mgr()->debug_string()); + } + + fmt::format_to(debug_string_buffer, "operators: "); for (size_t i = 0; i < _operators.size(); i++) { fmt::format_to(debug_string_buffer, "\n{}", _opened && !is_finalized()