diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h index 2a8ff5a4bb91a5..7160af3424b895 100644 --- a/be/src/olap/rowset/rowset.h +++ b/be/src/olap/rowset/rowset.h @@ -266,6 +266,14 @@ class Rowset : public std::enable_shared_from_this { } } + void update_delayed_expired_timestamp(uint64_t delayed_expired_timestamp) { + if (delayed_expired_timestamp > _delayed_expired_timestamp) { + _delayed_expired_timestamp = delayed_expired_timestamp; + } + } + + uint64_t delayed_expired_timestamp() { return _delayed_expired_timestamp; } + virtual Status get_segments_key_bounds(std::vector* segments_key_bounds) { _rowset_meta->get_segments_key_bounds(segments_key_bounds); return Status::OK(); @@ -328,6 +336,7 @@ class Rowset : public std::enable_shared_from_this { std::atomic _refs_by_reader; // rowset state machine RowsetStateMachine _rowset_state_machine; + std::atomic _delayed_expired_timestamp = 0; }; } // namespace doris diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 7c21b4770723ba..f2e242486430cd 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -794,14 +794,17 @@ void Tablet::delete_expired_stale_rowset() { for (auto& timestampedVersion : to_delete_version) { auto it = _stale_rs_version_map.find(timestampedVersion->version()); if (it != _stale_rs_version_map.end()) { - // delete rowset - StorageEngine::instance()->add_unused_rowset(it->second); - _stale_rs_version_map.erase(it); - VLOG_NOTICE << "delete stale rowset tablet=" << full_name() << " version[" - << timestampedVersion->version().first << "," - << timestampedVersion->version().second - << "] move to unused_rowset success " << std::fixed - << expired_stale_sweep_endtime; + uint64_t now = UnixSeconds(); + if (now > it->second->delayed_expired_timestamp()) { + // delete rowset + StorageEngine::instance()->add_unused_rowset(it->second); + _stale_rs_version_map.erase(it); + VLOG_NOTICE << "delete stale rowset tablet=" << full_name() << " version[" + << timestampedVersion->version().first << "," + << timestampedVersion->version().second + << "] move to unused_rowset success " << std::fixed + << expired_stale_sweep_endtime; + } } else { LOG(WARNING) << "delete stale rowset tablet=" << full_name() << " version[" << timestampedVersion->version().first << "," diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index 605cf6ec150231..ca56bc95bd245a 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -410,6 +410,18 @@ Status NewOlapScanner::_init_tablet_reader_params( ((NewOlapScanNode*)_parent)->_olap_scan_node.use_topn_opt; } + // If this is a Two-Phase read query, and we need to delay the release of Rowset + // by rowset->update_delayed_expired_timestamp().This could expand the lifespan of Rowset + if (_tablet_schema->field_index(BeConsts::ROWID_COL) >= 0) { + constexpr static int delayed_s = 60; + for (auto rs_reader : _tablet_reader_params.rs_readers) { + uint64_t delayed_expired_timestamp = + UnixSeconds() + _tablet_reader_params.runtime_state->execution_timeout() + + delayed_s; + rs_reader->rowset()->update_delayed_expired_timestamp(delayed_expired_timestamp); + } + } + return Status::OK(); }