From 02048b2ab268209d23fbf3249920b7fc581c30dd Mon Sep 17 00:00:00 2001 From: eldenmoon <15605149486@163.com> Date: Wed, 12 Jul 2023 10:48:42 +0800 Subject: [PATCH 1/2] [Fix](rowset) When a rowset is cooled down, it is directly deleted. This can result in data query misses in the second phase of a two-phase query. related pr #20732 There are two reasons for moving the logic of delayed deletion from the Tablet to the StorageEngine. The first reason is to consolidate the logic and unify the delayed operations. The second reason is that delayed garbage collection during queries can cause rowsets to remain in the "stale rowsets" state, preventing the timely deletion of rowset metadata, It may cause rowset metadata too large. --- be/src/olap/data_dir.cpp | 3 ++- be/src/olap/storage_engine.cpp | 16 +++++++++++++--- be/src/olap/storage_engine.h | 2 +- be/src/olap/tablet.cpp | 5 ----- be/src/olap/task/index_builder.cpp | 2 +- be/src/service/internal_service.cpp | 10 ++++++++-- 6 files changed, 25 insertions(+), 13 deletions(-) diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp index ce244f09a543e8..3f93ed340553b0 100644 --- a/be/src/olap/data_dir.cpp +++ b/be/src/olap/data_dir.cpp @@ -683,7 +683,8 @@ void DataDir::perform_path_gc_by_rowsetid() { TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id); if (tablet != nullptr) { if (!tablet->check_rowset_id(rowset_id) && - !StorageEngine::instance()->check_rowset_id_in_unused_rowsets(rowset_id)) { + !StorageEngine::instance()->check_rowset_id_in_unused_rowsets(rowset_id, + nullptr)) { _process_garbage_path(path); } } diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index cc37ced5b5feb4..746a4e062e72d0 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -903,7 +903,10 @@ void StorageEngine::start_delete_unused_rowset() { { std::lock_guard lock(_gc_mutex); for (auto it = _unused_rowsets.begin(); it != _unused_rowsets.end();) { - if (it->second.use_count() == 1 && it->second->need_delete_file()) { + uint64_t now = UnixSeconds(); + if (it->second.use_count() == 1 && it->second->need_delete_file() && + // We delay the GC time of this rowset since it's maybe still needed, see #20732 + now > it->second->delayed_expired_timestamp()) { if (it->second->is_local()) { unused_rowsets_copy[it->first] = it->second; } @@ -1052,10 +1055,17 @@ Status StorageEngine::execute_task(EngineTask* task) { } // check whether any unused rowsets's id equal to rowset_id -bool StorageEngine::check_rowset_id_in_unused_rowsets(const RowsetId& rowset_id) { +bool StorageEngine::check_rowset_id_in_unused_rowsets(const RowsetId& rowset_id, + RowsetSharedPtr* rs) { std::lock_guard lock(_gc_mutex); auto search = _unused_rowsets.find(rowset_id.to_string()); - return search != _unused_rowsets.end(); + if (search != _unused_rowsets.end()) { + if (rs) { + *rs = search->second; + } + return true; + } + return false; } void StorageEngine::create_cumulative_compaction( diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index d6215586eda9df..9134c5f0ccf916 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -144,7 +144,7 @@ class StorageEngine { TxnManager* txn_manager() { return _txn_manager.get(); } MemTableFlushExecutor* memtable_flush_executor() { return _memtable_flush_executor.get(); } - bool check_rowset_id_in_unused_rowsets(const RowsetId& rowset_id); + bool check_rowset_id_in_unused_rowsets(const RowsetId& rowset_id, RowsetSharedPtr* rs); RowsetId next_rowset_id() { return _rowset_id_generator->next_id(); } diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index eea7c666328c34..efaaf2fffe8923 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -781,11 +781,6 @@ void Tablet::delete_expired_stale_rowset() { for (auto& timestampedVersion : to_delete_version) { auto it = _stale_rs_version_map.find(timestampedVersion->version()); if (it != _stale_rs_version_map.end()) { - uint64_t now = UnixSeconds(); - if (now <= it->second->delayed_expired_timestamp()) { - // Some rowsets gc time was delayed, ignore - continue; - } // delete rowset StorageEngine::instance()->add_unused_rowset(it->second); _stale_rs_version_map.erase(it); diff --git a/be/src/olap/task/index_builder.cpp b/be/src/olap/task/index_builder.cpp index 67e39104c6fbc9..89c318a5514280 100644 --- a/be/src/olap/task/index_builder.cpp +++ b/be/src/olap/task/index_builder.cpp @@ -436,7 +436,7 @@ Status IndexBuilder::do_build_inverted_index() { Status IndexBuilder::modify_rowsets(const Merger::Statistics* stats) { for (auto rowset_ptr : _output_rowsets) { auto rowset_id = rowset_ptr->rowset_id(); - if (StorageEngine::instance()->check_rowset_id_in_unused_rowsets(rowset_id)) { + if (StorageEngine::instance()->check_rowset_id_in_unused_rowsets(rowset_id, nullptr)) { DCHECK(false) << "output rowset: " << rowset_id.to_string() << " in unused rowsets"; } } diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 05d9f16cfe9daa..cff1e94983ccd9 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -1540,11 +1540,17 @@ Status PInternalServiceImpl::_multi_get(const PMultiGetRequest& request, if (!tablet) { continue; } + // Get Rowset from either tablet or unused rowsets, since this rowset maybe expired and swept. + // But we ensured it's rowset is not released when init Tablet reader param, rowset->update_delayed_expired_timestamp(); BetaRowsetSharedPtr rowset = std::static_pointer_cast(tablet->get_rowset(rowset_id)); if (!rowset) { - LOG(INFO) << "no such rowset " << rowset_id; - continue; + RowsetSharedPtr rs; + if (!StorageEngine::instance()->check_rowset_id_in_unused_rowsets(rowset_id, &rs)) { + LOG(INFO) << "no such rowset " << rowset_id; + continue; + } + rowset = std::static_pointer_cast(rs); } size_t row_size = 0; Defer _defer([&]() { From 3dd39f1303ba7f5749e6b03959fc1c5383215220 Mon Sep 17 00:00:00 2001 From: eldenmoon <15605149486@163.com> Date: Wed, 12 Jul 2023 16:13:47 +0800 Subject: [PATCH 2/2] not use unused rowsets --- be/src/olap/data_dir.cpp | 3 +-- be/src/olap/storage_engine.cpp | 31 ++++++++++++++++------- be/src/olap/storage_engine.h | 12 ++++++++- be/src/olap/task/index_builder.cpp | 2 +- be/src/service/internal_service.cpp | 15 ++++------- be/src/vec/exec/scan/new_olap_scanner.cpp | 1 + 6 files changed, 41 insertions(+), 23 deletions(-) diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp index 3f93ed340553b0..ce244f09a543e8 100644 --- a/be/src/olap/data_dir.cpp +++ b/be/src/olap/data_dir.cpp @@ -683,8 +683,7 @@ void DataDir::perform_path_gc_by_rowsetid() { TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id); if (tablet != nullptr) { if (!tablet->check_rowset_id(rowset_id) && - !StorageEngine::instance()->check_rowset_id_in_unused_rowsets(rowset_id, - nullptr)) { + !StorageEngine::instance()->check_rowset_id_in_unused_rowsets(rowset_id)) { _process_garbage_path(path); } } diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 746a4e062e72d0..f573b59f6027ad 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -911,6 +911,7 @@ void StorageEngine::start_delete_unused_rowset() { unused_rowsets_copy[it->first] = it->second; } // remote rowset data will be reclaimed by `remove_unused_remote_files` + evict_querying_rowset(it->second->rowset_id()); it = _unused_rowsets.erase(it); } else { ++it; @@ -1055,17 +1056,10 @@ Status StorageEngine::execute_task(EngineTask* task) { } // check whether any unused rowsets's id equal to rowset_id -bool StorageEngine::check_rowset_id_in_unused_rowsets(const RowsetId& rowset_id, - RowsetSharedPtr* rs) { +bool StorageEngine::check_rowset_id_in_unused_rowsets(const RowsetId& rowset_id) { std::lock_guard lock(_gc_mutex); auto search = _unused_rowsets.find(rowset_id.to_string()); - if (search != _unused_rowsets.end()) { - if (rs) { - *rs = search->second; - } - return true; - } - return false; + return search != _unused_rowsets.end(); } void StorageEngine::create_cumulative_compaction( @@ -1179,4 +1173,23 @@ Status StorageEngine::get_compaction_status_json(std::string* result) { return Status::OK(); } +void StorageEngine::add_quering_rowset(RowsetSharedPtr rs) { + std::lock_guard lock(_quering_rowsets_mutex); + _querying_rowsets.emplace(rs->rowset_id(), rs); +} + +RowsetSharedPtr StorageEngine::get_quering_rowset(RowsetId rs_id) { + std::lock_guard lock(_quering_rowsets_mutex); + auto it = _querying_rowsets.find(rs_id); + if (it != _querying_rowsets.end()) { + return it->second; + } + return nullptr; +} + +void StorageEngine::evict_querying_rowset(RowsetId rs_id) { + std::lock_guard lock(_quering_rowsets_mutex); + _querying_rowsets.erase(rs_id); +} + } // namespace doris diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 9134c5f0ccf916..2bd01ba2cb29a2 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -144,7 +144,7 @@ class StorageEngine { TxnManager* txn_manager() { return _txn_manager.get(); } MemTableFlushExecutor* memtable_flush_executor() { return _memtable_flush_executor.get(); } - bool check_rowset_id_in_unused_rowsets(const RowsetId& rowset_id, RowsetSharedPtr* rs); + bool check_rowset_id_in_unused_rowsets(const RowsetId& rowset_id); RowsetId next_rowset_id() { return _rowset_id_generator->next_id(); } @@ -224,6 +224,12 @@ class StorageEngine { int64_t transaction_id, bool is_recover); int64_t get_pending_publish_min_version(int64_t tablet_id); + void add_quering_rowset(RowsetSharedPtr rs); + + RowsetSharedPtr get_quering_rowset(RowsetId rs_id); + + void evict_querying_rowset(RowsetId rs_id); + private: // Instance should be inited from `static open()` // MUST NOT be called in other circumstances. @@ -372,6 +378,10 @@ class StorageEngine { // map, if we use RowsetId as the key, we need custom hash func std::unordered_map _unused_rowsets; + // Hold reference of quering rowsets + std::mutex _quering_rowsets_mutex; + std::unordered_map _querying_rowsets; + // Count the memory consumption of segment compaction tasks. std::shared_ptr _segcompaction_mem_tracker; // This mem tracker is only for tracking memory use by segment meta data such as footer or index page. diff --git a/be/src/olap/task/index_builder.cpp b/be/src/olap/task/index_builder.cpp index 89c318a5514280..67e39104c6fbc9 100644 --- a/be/src/olap/task/index_builder.cpp +++ b/be/src/olap/task/index_builder.cpp @@ -436,7 +436,7 @@ Status IndexBuilder::do_build_inverted_index() { Status IndexBuilder::modify_rowsets(const Merger::Statistics* stats) { for (auto rowset_ptr : _output_rowsets) { auto rowset_id = rowset_ptr->rowset_id(); - if (StorageEngine::instance()->check_rowset_id_in_unused_rowsets(rowset_id, nullptr)) { + if (StorageEngine::instance()->check_rowset_id_in_unused_rowsets(rowset_id)) { DCHECK(false) << "output rowset: " << rowset_id.to_string() << " in unused rowsets"; } } diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index cff1e94983ccd9..745de1bb71abf5 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -1540,17 +1540,12 @@ Status PInternalServiceImpl::_multi_get(const PMultiGetRequest& request, if (!tablet) { continue; } - // Get Rowset from either tablet or unused rowsets, since this rowset maybe expired and swept. - // But we ensured it's rowset is not released when init Tablet reader param, rowset->update_delayed_expired_timestamp(); - BetaRowsetSharedPtr rowset = - std::static_pointer_cast(tablet->get_rowset(rowset_id)); + // We ensured it's rowset is not released when init Tablet reader param, rowset->update_delayed_expired_timestamp(); + BetaRowsetSharedPtr rowset = std::static_pointer_cast( + StorageEngine::instance()->get_quering_rowset(rowset_id)); if (!rowset) { - RowsetSharedPtr rs; - if (!StorageEngine::instance()->check_rowset_id_in_unused_rowsets(rowset_id, &rs)) { - LOG(INFO) << "no such rowset " << rowset_id; - continue; - } - rowset = std::static_pointer_cast(rs); + LOG(INFO) << "no such rowset " << rowset_id; + continue; } size_t row_size = 0; Defer _defer([&]() { diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index 0b631b143e58f9..b945d42f6c798d 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -413,6 +413,7 @@ Status NewOlapScanner::_init_tablet_reader_params( UnixSeconds() + _tablet_reader_params.runtime_state->execution_timeout() + delayed_s; rs_reader->rowset()->update_delayed_expired_timestamp(delayed_expired_timestamp); + StorageEngine::instance()->add_quering_rowset(rs_reader->rowset()); } }