Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 31 additions & 6 deletions be/src/runtime/group_commit_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ Status LoadBlockQueue::add_block(std::shared_ptr<vectorized::Block> block, bool
return Status::OK();
}

Status LoadBlockQueue::get_block(vectorized::Block* block, bool* find_block, bool* eos) {
Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block* block,
bool* find_block, bool* eos) {
*find_block = false;
*eos = false;
std::unique_lock l(mutex);
Expand All @@ -65,22 +66,40 @@ Status LoadBlockQueue::get_block(vectorized::Block* block, bool* find_block, boo
need_commit = true;
}
}
while (status.ok() && _block_queue.empty() &&
while (!runtime_state->is_cancelled() && status.ok() && _block_queue.empty() &&
(!need_commit || (need_commit && !_load_ids.empty()))) {
CHECK_EQ(_single_block_queue_bytes->load(), 0);
auto left_milliseconds = _group_commit_interval_ms;
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - _start_time)
.count();
if (!need_commit) {
left_milliseconds = _group_commit_interval_ms -
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - _start_time)
.count();
left_milliseconds = _group_commit_interval_ms - duration;
if (left_milliseconds <= 0) {
need_commit = true;
break;
}
} else {
if (duration >= 10 * _group_commit_interval_ms) {
std::stringstream ss;
ss << "[";
for (auto& id : _load_ids) {
ss << id.to_string() << ", ";
}
ss << "]";
LOG(INFO) << "find one group_commit need to commit, txn_id=" << txn_id
<< ", label=" << label << ", instance_id=" << load_instance_id
<< ", duration=" << duration << ", load_ids=" << ss.str()
<< ", runtime_state=" << runtime_state;
}
}
_get_cond.wait_for(l, std::chrono::milliseconds(left_milliseconds));
}
if (runtime_state->is_cancelled()) {
auto st = Status::Cancelled(runtime_state->cancel_reason());
_cancel_without_lock(st);
return st;
}
if (!_block_queue.empty()) {
auto fblock = _block_queue.front();
block->swap(*fblock.get());
Expand Down Expand Up @@ -120,6 +139,12 @@ Status LoadBlockQueue::add_load_id(const UniqueId& load_id) {
void LoadBlockQueue::cancel(const Status& st) {
DCHECK(!st.ok());
std::unique_lock l(mutex);
_cancel_without_lock(st);
}

void LoadBlockQueue::_cancel_without_lock(const Status& st) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: method '_cancel_without_lock' can be made static [readability-convert-member-functions-to-static]

be/src/runtime/group_commit_mgr.h:75:

-     void _cancel_without_lock(const Status& st);
+     static void _cancel_without_lock(const Status& st);

LOG(INFO) << "cancel group_commit, instance_id=" << load_instance_id << ", label=" << label
<< ", status=" << st.to_string();
status = st;
while (!_block_queue.empty()) {
{
Expand Down
4 changes: 3 additions & 1 deletion be/src/runtime/group_commit_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ class LoadBlockQueue {
};

Status add_block(std::shared_ptr<vectorized::Block> block, bool write_wal);
Status get_block(vectorized::Block* block, bool* find_block, bool* eos);
Status get_block(RuntimeState* runtime_state, vectorized::Block* block, bool* find_block,
bool* eos);
Status add_load_id(const UniqueId& load_id);
void remove_load_id(const UniqueId& load_id);
void cancel(const Status& st);
Expand All @@ -72,6 +73,7 @@ class LoadBlockQueue {
Status status = Status::OK();

private:
void _cancel_without_lock(const Status& st);
std::chrono::steady_clock::time_point _start_time;

std::condition_variable _put_cond;
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/scan/group_commit_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ GroupCommitScanNode::GroupCommitScanNode(ObjectPool* pool, const TPlanNode& tnod
Status GroupCommitScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) {
bool find_node = false;
while (!find_node && !*eos) {
RETURN_IF_ERROR(load_block_queue->get_block(block, &find_node, eos));
RETURN_IF_ERROR(load_block_queue->get_block(state, block, &find_node, eos));
}
return Status::OK();
}
Expand Down