From 4826d1c247999e9a8349a4e7efbb1726eea90f90 Mon Sep 17 00:00:00 2001 From: meiyi Date: Wed, 20 Dec 2023 21:07:02 +0800 Subject: [PATCH] [fix](group_commit) fix group commit cancel stuck --- be/src/runtime/group_commit_mgr.cpp | 37 ++++++++++++++++--- be/src/runtime/group_commit_mgr.h | 4 +- .../vec/exec/scan/group_commit_scan_node.cpp | 2 +- 3 files changed, 35 insertions(+), 8 deletions(-) diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 16c841cc0f3808..f06c3bf3815f94 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -52,7 +52,8 @@ Status LoadBlockQueue::add_block(std::shared_ptr block, bool return Status::OK(); } -Status LoadBlockQueue::get_block(vectorized::Block* block, bool* find_block, bool* eos) { +Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block* block, + bool* find_block, bool* eos) { *find_block = false; *eos = false; std::unique_lock l(mutex); @@ -65,22 +66,40 @@ Status LoadBlockQueue::get_block(vectorized::Block* block, bool* find_block, boo need_commit = true; } } - while (status.ok() && _block_queue.empty() && + while (!runtime_state->is_cancelled() && status.ok() && _block_queue.empty() && (!need_commit || (need_commit && !_load_ids.empty()))) { CHECK_EQ(_single_block_queue_bytes->load(), 0); auto left_milliseconds = _group_commit_interval_ms; + auto duration = std::chrono::duration_cast( + std::chrono::steady_clock::now() - _start_time) + .count(); if (!need_commit) { - left_milliseconds = _group_commit_interval_ms - - std::chrono::duration_cast( - std::chrono::steady_clock::now() - _start_time) - .count(); + left_milliseconds = _group_commit_interval_ms - duration; if (left_milliseconds <= 0) { need_commit = true; break; } + } else { + if (duration >= 10 * _group_commit_interval_ms) { + std::stringstream ss; + ss << "["; + for (auto& id : _load_ids) { + ss << id.to_string() << ", "; + } + ss << "]"; + LOG(INFO) << "find one group_commit need to commit, txn_id=" << txn_id + << ", label=" << label << ", instance_id=" << load_instance_id + << ", duration=" << duration << ", load_ids=" << ss.str() + << ", runtime_state=" << runtime_state; + } } _get_cond.wait_for(l, std::chrono::milliseconds(left_milliseconds)); } + if (runtime_state->is_cancelled()) { + auto st = Status::Cancelled(runtime_state->cancel_reason()); + _cancel_without_lock(st); + return st; + } if (!_block_queue.empty()) { auto fblock = _block_queue.front(); block->swap(*fblock.get()); @@ -120,6 +139,12 @@ Status LoadBlockQueue::add_load_id(const UniqueId& load_id) { void LoadBlockQueue::cancel(const Status& st) { DCHECK(!st.ok()); std::unique_lock l(mutex); + _cancel_without_lock(st); +} + +void LoadBlockQueue::_cancel_without_lock(const Status& st) { + LOG(INFO) << "cancel group_commit, instance_id=" << load_instance_id << ", label=" << label + << ", status=" << st.to_string(); status = st; while (!_block_queue.empty()) { { diff --git a/be/src/runtime/group_commit_mgr.h b/be/src/runtime/group_commit_mgr.h index 44826cd9465922..0c6edfc267f10b 100644 --- a/be/src/runtime/group_commit_mgr.h +++ b/be/src/runtime/group_commit_mgr.h @@ -50,7 +50,8 @@ class LoadBlockQueue { }; Status add_block(std::shared_ptr block, bool write_wal); - Status get_block(vectorized::Block* block, bool* find_block, bool* eos); + Status get_block(RuntimeState* runtime_state, vectorized::Block* block, bool* find_block, + bool* eos); Status add_load_id(const UniqueId& load_id); void remove_load_id(const UniqueId& load_id); void cancel(const Status& st); @@ -72,6 +73,7 @@ class LoadBlockQueue { Status status = Status::OK(); private: + void _cancel_without_lock(const Status& st); std::chrono::steady_clock::time_point _start_time; std::condition_variable _put_cond; diff --git a/be/src/vec/exec/scan/group_commit_scan_node.cpp b/be/src/vec/exec/scan/group_commit_scan_node.cpp index d78c165a3dcb4f..50ba8e31be468f 100644 --- a/be/src/vec/exec/scan/group_commit_scan_node.cpp +++ b/be/src/vec/exec/scan/group_commit_scan_node.cpp @@ -35,7 +35,7 @@ GroupCommitScanNode::GroupCommitScanNode(ObjectPool* pool, const TPlanNode& tnod Status GroupCommitScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) { bool find_node = false; while (!find_node && !*eos) { - RETURN_IF_ERROR(load_block_queue->get_block(block, &find_node, eos)); + RETURN_IF_ERROR(load_block_queue->get_block(state, block, &find_node, eos)); } return Status::OK(); }