diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp index 5fe1c3b3e6c51c..63a70d6a7d6d7b 100644 --- a/be/src/exec/olap_scan_node.cpp +++ b/be/src/exec/olap_scan_node.cpp @@ -215,7 +215,7 @@ Status OlapScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eo if (state->is_cancelled()) { boost::unique_lock l(_row_batches_lock); _transfer_done = true; - boost::lock_guard guard(_status_mutex); + boost::lock_guard guard(_status_mutex); if (LIKELY(_status.ok())) { _status = Status::Cancelled("Cancelled"); } @@ -303,7 +303,7 @@ Status OlapScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eo // all scanner done, change *eos to true *eos = true; - boost::lock_guard guard(_status_mutex); + boost::lock_guard guard(_status_mutex); return _status; } @@ -348,7 +348,7 @@ Status OlapScanNode::close(RuntimeState* state) { // OlapScanNode terminate by exception // so that initiative close the Scanner - for (auto scanner : _all_olap_scanners) { + for (auto scanner : _olap_scanners) { scanner->close(state); } @@ -662,7 +662,6 @@ Status OlapScanNode::start_scan_thread(RuntimeState* state) { _scanner_pool->add(scanner); _olap_scanners.push_back(scanner); - _all_olap_scanners = _olap_scanners; } // init progress @@ -1036,7 +1035,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) { for (auto scanner : _olap_scanners) { status = Expr::clone_if_not_exists(_conjunct_ctxs, state, scanner->conjunct_ctxs()); if (!status.ok()) { - boost::lock_guard guard(_status_mutex); + boost::lock_guard guard(_status_mutex); _status = status; break; } @@ -1179,7 +1178,7 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) { if (!scanner->is_open()) { status = scanner->open(); if (!status.ok()) { - boost::lock_guard guard(_status_mutex); + boost::lock_guard guard(_status_mutex); _status = status; eos = true; } @@ -1230,43 +1229,50 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) { raw_rows_read = scanner->raw_rows_read(); } - boost::unique_lock l(_scan_batches_lock); - // if we failed, check status. - if (UNLIKELY(!status.ok())) { - _transfer_done = true; - boost::lock_guard guard(_status_mutex); - if (LIKELY(_status.ok())) { - _status = status; + { + boost::unique_lock l(_scan_batches_lock); + // if we failed, check status. + if (UNLIKELY(!status.ok())) { + _transfer_done = true; + boost::lock_guard guard(_status_mutex); + if (LIKELY(_status.ok())) { + _status = status; + } } - } - bool global_status_ok = false; - { - boost::lock_guard guard(_status_mutex); - global_status_ok = _status.ok(); - } - if (UNLIKELY(!global_status_ok)) { - eos = true; - for (auto rb : row_batchs) { - delete rb; + bool global_status_ok = false; + { + boost::lock_guard guard(_status_mutex); + global_status_ok = _status.ok(); } - } else { - for (auto rb : row_batchs) { - _scan_row_batches.push_back(rb); + if (UNLIKELY(!global_status_ok)) { + eos = true; + for (auto rb : row_batchs) { + delete rb; + } + } else { + for (auto rb : row_batchs) { + _scan_row_batches.push_back(rb); + } } + // If eos is true, we will process out of this lock block. + if (!eos) { + _olap_scanners.push_front(scanner); + } + _running_thread--; } - // Scanner thread completed. Take a look and update the status - if (UNLIKELY(eos)) { + if (eos) { + // close out of batches lock. we do this before _progress update + // that can assure this object can keep live before we finish. + scanner->close(_runtime_state); + + boost::unique_lock l(_scan_batches_lock); _progress.update(1); if (_progress.done()) { // this is the right out _scanner_done = true; } - scanner->close(_runtime_state); - } else { - _olap_scanners.push_front(scanner); } - _running_thread--; _scan_batch_added_cv.notify_one(); } diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h index c75b1c1bd60cd4..c8ffef25bc05a4 100644 --- a/be/src/exec/olap_scan_node.h +++ b/be/src/exec/olap_scan_node.h @@ -35,6 +35,7 @@ #include "runtime/row_batch_interface.hpp" #include "runtime/vectorized_row_batch.h" #include "util/progress_updater.h" +#include "util/spinlock.h" namespace doris { @@ -226,7 +227,6 @@ class OlapScanNode : public ScanNode { std::list _scan_row_batches; - std::list _all_olap_scanners; std::list _olap_scanners; int _max_materialized_row_batches; @@ -240,7 +240,7 @@ class OlapScanNode : public ScanNode { int _nice; // protect _status, for many thread may change _status - boost::mutex _status_mutex; + SpinLock _status_mutex; Status _status; RuntimeState* _runtime_state; RuntimeProfile::Counter* _scan_timer;