From 95bc393ba78362440871ccae2b6c68a803aa1a39 Mon Sep 17 00:00:00 2001 From: eldenmoon <15605149486@163.com> Date: Wed, 14 Jun 2023 12:31:58 +0800 Subject: [PATCH 1/2] [Bug](topn opt) Fix Two-Phase read when some rowset swept If this is a Two-Phase read query, and we need to delay the release of Rowset by row->update_delayed_expired_timestamp() to expand the lifespan of rowsets. This is necessary to avoid data loss during the second phase reading, where some stale rowsets may be swept and result in missing data. For rowsets that have been moved to the unused rowsets, they are also needed in second phase reading. --- be/src/olap/data_dir.cpp | 4 +++- be/src/olap/rowset/rowset.h | 9 +++++++++ be/src/olap/storage_engine.cpp | 13 ++++++++++--- be/src/olap/storage_engine.h | 2 ++ be/src/service/internal_service.cpp | 11 +++++++++-- be/src/vec/exec/scan/new_olap_scanner.cpp | 12 ++++++++++++ 6 files changed, 45 insertions(+), 6 deletions(-) diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp index b83ce436356d52..464bc33483edd8 100644 --- a/be/src/olap/data_dir.cpp +++ b/be/src/olap/data_dir.cpp @@ -627,7 +627,9 @@ void DataDir::perform_path_gc_by_rowsetid() { TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id); if (tablet != nullptr) { if (!tablet->check_rowset_id(rowset_id) && - !StorageEngine::instance()->check_rowset_id_in_unused_rowsets(rowset_id)) { + !StorageEngine::instance() + ->get_rowset_in_unused_rowsets(rowset_id) + .has_value()) { _process_garbage_path(path); } } diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h index 2a8ff5a4bb91a5..7160af3424b895 100644 --- a/be/src/olap/rowset/rowset.h +++ b/be/src/olap/rowset/rowset.h @@ -266,6 +266,14 @@ class Rowset : public std::enable_shared_from_this { } } + void update_delayed_expired_timestamp(uint64_t delayed_expired_timestamp) { + if (delayed_expired_timestamp > _delayed_expired_timestamp) { + _delayed_expired_timestamp = delayed_expired_timestamp; + } + } + + uint64_t delayed_expired_timestamp() { return _delayed_expired_timestamp; } + virtual Status get_segments_key_bounds(std::vector* segments_key_bounds) { _rowset_meta->get_segments_key_bounds(segments_key_bounds); return Status::OK(); @@ -328,6 +336,7 @@ class Rowset : public std::enable_shared_from_this { std::atomic _refs_by_reader; // rowset state machine RowsetStateMachine _rowset_state_machine; + std::atomic _delayed_expired_timestamp = 0; }; } // namespace doris diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 1d65da4d591f25..263df09455ab4b 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -1004,7 +1004,9 @@ void StorageEngine::start_delete_unused_rowset() { { std::lock_guard lock(_gc_mutex); for (auto it = _unused_rowsets.begin(); it != _unused_rowsets.end();) { - if (it->second.use_count() == 1 && it->second->need_delete_file()) { + uint64_t now = UnixSeconds(); + if (it->second.use_count() == 1 && it->second->need_delete_file() && + now > it->second->delayed_expired_timestamp()) { if (it->second->is_local()) { unused_rowsets_copy[it->first] = it->second; } @@ -1155,10 +1157,15 @@ Status StorageEngine::execute_task(EngineTask* task) { } // check whether any unused rowsets's id equal to rowset_id -bool StorageEngine::check_rowset_id_in_unused_rowsets(const RowsetId& rowset_id) { +std::optional StorageEngine::get_rowset_in_unused_rowsets( + const RowsetId& rowset_id) { std::lock_guard lock(_gc_mutex); auto search = _unused_rowsets.find(rowset_id.to_string()); - return search != _unused_rowsets.end(); + if (search != _unused_rowsets.end()) { + return search->second; + } + // Not found + return {}; } void StorageEngine::create_cumulative_compaction( diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 42e7bfdef7fc22..63c304393c7e32 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -215,6 +215,8 @@ class StorageEngine { Status process_index_change_task(const TAlterInvertedIndexReq& reqest); + std::optional get_rowset_in_unused_rowsets(const RowsetId& rowset_id); + private: // Instance should be inited from `static open()` // MUST NOT be called in other circumstances. diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index c3ed7313e90d9f..7f2e231df5dd7d 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -1514,9 +1514,16 @@ Status PInternalServiceImpl::_multi_get(const PMultiGetRequest& request, } BetaRowsetSharedPtr rowset = std::static_pointer_cast(tablet->get_rowset(rowset_id)); + // Get Rowset from either tablet or unused rowsets, since this rowset maybe expired and swept. + // But we ensured it's rowset is not released when init Tablet reader param, rowset->update_delayed_expired_timestamp(); if (!rowset) { - LOG(INFO) << "no such rowset " << rowset_id; - continue; + std::optional from_unused_rowset = + StorageEngine::instance()->get_rowset_in_unused_rowsets(rowset_id); + if (!from_unused_rowset.has_value()) { + LOG(INFO) << "no such rowset " << rowset_id; + continue; + } + rowset = std::static_pointer_cast(from_unused_rowset.value()); } size_t row_size = 0; Defer _defer([&]() { diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index 605cf6ec150231..ca56bc95bd245a 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -410,6 +410,18 @@ Status NewOlapScanner::_init_tablet_reader_params( ((NewOlapScanNode*)_parent)->_olap_scan_node.use_topn_opt; } + // If this is a Two-Phase read query, and we need to delay the release of Rowset + // by rowset->update_delayed_expired_timestamp().This could expand the lifespan of Rowset + if (_tablet_schema->field_index(BeConsts::ROWID_COL) >= 0) { + constexpr static int delayed_s = 60; + for (auto rs_reader : _tablet_reader_params.rs_readers) { + uint64_t delayed_expired_timestamp = + UnixSeconds() + _tablet_reader_params.runtime_state->execution_timeout() + + delayed_s; + rs_reader->rowset()->update_delayed_expired_timestamp(delayed_expired_timestamp); + } + } + return Status::OK(); } From 65a90217f1c9f1f634e331eed02a890c031d438c Mon Sep 17 00:00:00 2001 From: eldenmoon <15605149486@163.com> Date: Wed, 14 Jun 2023 13:29:46 +0800 Subject: [PATCH 2/2] move delay logic to tablet --- be/src/olap/data_dir.cpp | 4 +--- be/src/olap/storage_engine.cpp | 13 +++---------- be/src/olap/storage_engine.h | 2 -- be/src/olap/tablet.cpp | 19 +++++++++++-------- be/src/service/internal_service.cpp | 11 ++--------- 5 files changed, 17 insertions(+), 32 deletions(-) diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp index 464bc33483edd8..b83ce436356d52 100644 --- a/be/src/olap/data_dir.cpp +++ b/be/src/olap/data_dir.cpp @@ -627,9 +627,7 @@ void DataDir::perform_path_gc_by_rowsetid() { TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id); if (tablet != nullptr) { if (!tablet->check_rowset_id(rowset_id) && - !StorageEngine::instance() - ->get_rowset_in_unused_rowsets(rowset_id) - .has_value()) { + !StorageEngine::instance()->check_rowset_id_in_unused_rowsets(rowset_id)) { _process_garbage_path(path); } } diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 263df09455ab4b..1d65da4d591f25 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -1004,9 +1004,7 @@ void StorageEngine::start_delete_unused_rowset() { { std::lock_guard lock(_gc_mutex); for (auto it = _unused_rowsets.begin(); it != _unused_rowsets.end();) { - uint64_t now = UnixSeconds(); - if (it->second.use_count() == 1 && it->second->need_delete_file() && - now > it->second->delayed_expired_timestamp()) { + if (it->second.use_count() == 1 && it->second->need_delete_file()) { if (it->second->is_local()) { unused_rowsets_copy[it->first] = it->second; } @@ -1157,15 +1155,10 @@ Status StorageEngine::execute_task(EngineTask* task) { } // check whether any unused rowsets's id equal to rowset_id -std::optional StorageEngine::get_rowset_in_unused_rowsets( - const RowsetId& rowset_id) { +bool StorageEngine::check_rowset_id_in_unused_rowsets(const RowsetId& rowset_id) { std::lock_guard lock(_gc_mutex); auto search = _unused_rowsets.find(rowset_id.to_string()); - if (search != _unused_rowsets.end()) { - return search->second; - } - // Not found - return {}; + return search != _unused_rowsets.end(); } void StorageEngine::create_cumulative_compaction( diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 63c304393c7e32..42e7bfdef7fc22 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -215,8 +215,6 @@ class StorageEngine { Status process_index_change_task(const TAlterInvertedIndexReq& reqest); - std::optional get_rowset_in_unused_rowsets(const RowsetId& rowset_id); - private: // Instance should be inited from `static open()` // MUST NOT be called in other circumstances. diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 7c21b4770723ba..f2e242486430cd 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -794,14 +794,17 @@ void Tablet::delete_expired_stale_rowset() { for (auto& timestampedVersion : to_delete_version) { auto it = _stale_rs_version_map.find(timestampedVersion->version()); if (it != _stale_rs_version_map.end()) { - // delete rowset - StorageEngine::instance()->add_unused_rowset(it->second); - _stale_rs_version_map.erase(it); - VLOG_NOTICE << "delete stale rowset tablet=" << full_name() << " version[" - << timestampedVersion->version().first << "," - << timestampedVersion->version().second - << "] move to unused_rowset success " << std::fixed - << expired_stale_sweep_endtime; + uint64_t now = UnixSeconds(); + if (now > it->second->delayed_expired_timestamp()) { + // delete rowset + StorageEngine::instance()->add_unused_rowset(it->second); + _stale_rs_version_map.erase(it); + VLOG_NOTICE << "delete stale rowset tablet=" << full_name() << " version[" + << timestampedVersion->version().first << "," + << timestampedVersion->version().second + << "] move to unused_rowset success " << std::fixed + << expired_stale_sweep_endtime; + } } else { LOG(WARNING) << "delete stale rowset tablet=" << full_name() << " version[" << timestampedVersion->version().first << "," diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 7f2e231df5dd7d..c3ed7313e90d9f 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -1514,16 +1514,9 @@ Status PInternalServiceImpl::_multi_get(const PMultiGetRequest& request, } BetaRowsetSharedPtr rowset = std::static_pointer_cast(tablet->get_rowset(rowset_id)); - // Get Rowset from either tablet or unused rowsets, since this rowset maybe expired and swept. - // But we ensured it's rowset is not released when init Tablet reader param, rowset->update_delayed_expired_timestamp(); if (!rowset) { - std::optional from_unused_rowset = - StorageEngine::instance()->get_rowset_in_unused_rowsets(rowset_id); - if (!from_unused_rowset.has_value()) { - LOG(INFO) << "no such rowset " << rowset_id; - continue; - } - rowset = std::static_pointer_cast(from_unused_rowset.value()); + LOG(INFO) << "no such rowset " << rowset_id; + continue; } size_t row_size = 0; Defer _defer([&]() {