diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index 8c2e838bf75..bf565cc72d4 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -70,7 +70,8 @@ std::unordered_map> FailPointHelper::f M(exception_before_drop_segment) \ M(exception_after_drop_segment) \ M(exception_between_schema_change_in_the_same_diff) \ - M(force_ps_wal_compact) + M(force_ps_wal_compact) \ + M(pause_before_full_gc_prepare) #define APPLY_FOR_FAILPOINTS(M) \ M(skip_check_segment_update) \ diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index 338d5ef004b..1139ef4fd9c 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -236,11 +236,14 @@ namespace DB F(type_mpp_query_count, {"type", "mpp_query_count"})) \ // clang-format on +/// Buckets with boundaries [start * base^0, start * base^1, ..., start * base^(size-1)] struct ExpBuckets { const double start; const int base; const size_t size; + + // NOLINTNEXTLINE(google-explicit-constructor) inline operator prometheus::Histogram::BucketBoundaries() const && { prometheus::Histogram::BucketBoundaries buckets(size); @@ -259,6 +262,8 @@ struct EqualWidthBuckets const size_t start; const int num_buckets; const size_t step; + + // NOLINTNEXTLINE(google-explicit-constructor) inline operator prometheus::Histogram::BucketBoundaries() const && { // up to `num_buckets` * `step` @@ -384,12 +389,17 @@ class TiFlashMetrics APPLY_FOR_METRICS(MAKE_METRIC_ENUM_M, MAKE_METRIC_ENUM_F) #undef APPLY_FOR_METRICS +// NOLINTNEXTLINE(bugprone-reserved-identifier) #define __GET_METRIC_MACRO(_1, _2, NAME, ...) NAME #ifndef GTEST_TIFLASH_METRICS +// NOLINTNEXTLINE(bugprone-reserved-identifier) #define __GET_METRIC_0(family) TiFlashMetrics::instance().family.get() +// NOLINTNEXTLINE(bugprone-reserved-identifier) #define __GET_METRIC_1(family, metric) TiFlashMetrics::instance().family.get(family##_metrics::metric) #else +// NOLINTNEXTLINE(bugprone-reserved-identifier) #define __GET_METRIC_0(family) TestMetrics::instance().family.get() +// NOLINTNEXTLINE(bugprone-reserved-identifier) #define __GET_METRIC_1(family, metric) TestMetrics::instance().family.get(family##_metrics::metric) #endif #define GET_METRIC(...) \ diff --git a/dbms/src/Storages/Page/Page.h b/dbms/src/Storages/Page/Page.h index 3fe8cb8f060..974234bf1dd 100644 --- a/dbms/src/Storages/Page/Page.h +++ b/dbms/src/Storages/Page/Page.h @@ -108,7 +108,6 @@ struct Page using Pages = std::vector; using PageMap = std::map; -using PageHandler = std::function; // TODO: Move it into V2 // Indicate the page size && offset in PageFile. diff --git a/dbms/src/Storages/Page/PageStorage.cpp b/dbms/src/Storages/Page/PageStorage.cpp index 451f0f22d9e..0d323feba7f 100644 --- a/dbms/src/Storages/Page/PageStorage.cpp +++ b/dbms/src/Storages/Page/PageStorage.cpp @@ -53,8 +53,6 @@ class PageReaderImpl : private boost::noncopyable virtual PageMap read(const PageIds & page_ids) const = 0; - virtual void read(const PageIds & page_ids, PageHandler & handler) const = 0; - using PageReadFields = PageStorage::PageReadFields; virtual PageMap read(const std::vector & page_fields) const = 0; @@ -103,11 +101,6 @@ class PageReaderImplNormal : public PageReaderImpl return storage->read(ns_id, page_ids, read_limiter, snap); } - void read(const PageIds & page_ids, PageHandler & handler) const override - { - storage->read(ns_id, page_ids, handler, read_limiter, snap); - } - using PageReadFields = PageStorage::PageReadFields; PageMap read(const std::vector & page_fields) const override { @@ -218,12 +211,6 @@ class PageReaderImplMixed : public PageReaderImpl return page_maps; } - void read(const PageIds & page_ids, PageHandler & handler) const override - { - const auto & page_ids_not_found = storage_v3->read(ns_id, page_ids, handler, read_limiter, toConcreteV3Snapshot(), false); - storage_v2->read(ns_id, page_ids_not_found, handler, read_limiter, toConcreteV2Snapshot()); - } - using PageReadFields = PageStorage::PageReadFields; PageMap read(const std::vector & page_fields) const override { @@ -406,11 +393,6 @@ PageMap PageReader::read(const PageIds & page_ids) const return impl->read(page_ids); } -void PageReader::read(const PageIds & page_ids, PageHandler & handler) const -{ - impl->read(page_ids, handler); -} - PageMap PageReader::read(const std::vector & page_fields) const { return impl->read(page_fields); diff --git a/dbms/src/Storages/Page/PageStorage.h b/dbms/src/Storages/Page/PageStorage.h index f232f2e84d3..bb6835298ce 100644 --- a/dbms/src/Storages/Page/PageStorage.h +++ b/dbms/src/Storages/Page/PageStorage.h @@ -169,15 +169,6 @@ class PageStorage : private boost::noncopyable return readImpl(ns_id, page_ids, read_limiter, snapshot, throw_on_not_exist); } - /** - * If throw_on_not_exist is false, Also we do have some of page_id not found. - * Then the return value will record the all of page_id which not found. - */ - PageIds read(NamespaceId ns_id, const PageIds & page_ids, const PageHandler & handler, const ReadLimiterPtr & read_limiter = nullptr, SnapshotPtr snapshot = {}, bool throw_on_not_exist = true) - { - return readImpl(ns_id, page_ids, handler, read_limiter, snapshot, throw_on_not_exist); - } - using FieldIndices = std::vector; using PageReadFields = std::pair; @@ -223,8 +214,6 @@ class PageStorage : private boost::noncopyable virtual PageMap readImpl(NamespaceId ns_id, const PageIds & page_ids, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot, bool throw_on_not_exist) = 0; - virtual PageIds readImpl(NamespaceId ns_id, const PageIds & page_ids, const PageHandler & handler, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot, bool throw_on_not_exist) = 0; - virtual PageMap readImpl(NamespaceId ns_id, const std::vector & page_fields, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot, bool throw_on_not_exist) = 0; virtual Page readImpl(NamespaceId ns_id, const PageReadFields & page_field, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot, bool throw_on_not_exist) = 0; @@ -261,8 +250,6 @@ class PageReader : private boost::noncopyable PageMap read(const PageIds & page_ids) const; - void read(const PageIds & page_ids, PageHandler & handler) const; - using PageReadFields = PageStorage::PageReadFields; PageMap read(const std::vector & page_fields) const; diff --git a/dbms/src/Storages/Page/V2/PageFile.cpp b/dbms/src/Storages/Page/V2/PageFile.cpp index a722f2758fd..a0c501f2bb8 100644 --- a/dbms/src/Storages/Page/V2/PageFile.cpp +++ b/dbms/src/Storages/Page/V2/PageFile.cpp @@ -935,63 +935,6 @@ PageMap PageFile::Reader::read(PageIdAndEntries & to_read, const ReadLimiterPtr return page_map; } -void PageFile::Reader::read(PageIdAndEntries & to_read, const PageHandler & handler, const ReadLimiterPtr & read_limiter) -{ - ProfileEvents::increment(ProfileEvents::PSMReadPages, to_read.size()); - - // Sort in ascending order by offset in file. - std::sort(to_read.begin(), to_read.end(), [](const PageIdAndEntry & a, const PageIdAndEntry & b) { - return a.second.offset < b.second.offset; - }); - - size_t buf_size = 0; - for (const auto & p : to_read) - buf_size = std::max(buf_size, p.second.size); - - char * data_buf = static_cast(alloc(buf_size)); - MemHolder mem_holder = createMemHolder(data_buf, [&, buf_size](char * p) { free(p, buf_size); }); - - - auto it = to_read.begin(); - while (it != to_read.end()) - { - auto && [page_id, entry] = *it; - - PageUtil::readFile(data_file, entry.offset, data_buf, entry.size, read_limiter); - - if constexpr (PAGE_CHECKSUM_ON_READ) - { - auto checksum = CityHash_v1_0_2::CityHash64(data_buf, entry.size); - if (unlikely(entry.size != 0 && checksum != entry.checksum)) - { - std::stringstream ss; - ss << ", expected: " << std::hex << entry.checksum << ", but: " << checksum; - throw Exception("Page [" + DB::toString(page_id) + "] checksum not match, broken file: " + data_file_path + ss.str(), - ErrorCodes::CHECKSUM_DOESNT_MATCH); - } - } - - Page page; - page.page_id = page_id; - page.data = ByteBuffer(data_buf, data_buf + entry.size); - page.mem_holder = mem_holder; - - ++it; - - //#ifndef __APPLE__ - // if (it != to_read.end()) - // { - // auto & next_page_cache = it->second; - // ::posix_fadvise(data_file_fd, next_page_cache.offset, next_page_cache.size, POSIX_FADV_WILLNEED); - // } - //#endif - - handler(page_id, page); - } - - last_read_time = Clock::now(); -} - PageMap PageFile::Reader::read(PageFile::Reader::FieldReadInfos & to_read, const ReadLimiterPtr & read_limiter) { ProfileEvents::increment(ProfileEvents::PSMReadPages, to_read.size()); diff --git a/dbms/src/Storages/Page/V2/PageFile.h b/dbms/src/Storages/Page/V2/PageFile.h index 685eee05967..d3209b99936 100644 --- a/dbms/src/Storages/Page/V2/PageFile.h +++ b/dbms/src/Storages/Page/V2/PageFile.h @@ -82,8 +82,6 @@ class PageFile : public Allocator /// After return, the items in to_read could be reordered, but won't be removed or added. PageMap read(PageIdAndEntries & to_read, const ReadLimiterPtr & read_limiter = nullptr, bool background = false); - void read(PageIdAndEntries & to_read, const PageHandler & handler, const ReadLimiterPtr & read_limiter = nullptr); - struct FieldReadInfo { PageId page_id; diff --git a/dbms/src/Storages/Page/V2/PageStorage.cpp b/dbms/src/Storages/Page/V2/PageStorage.cpp index ea24b855124..624f68b441b 100644 --- a/dbms/src/Storages/Page/V2/PageStorage.cpp +++ b/dbms/src/Storages/Page/V2/PageStorage.cpp @@ -697,53 +697,6 @@ PageMap PageStorage::readImpl(NamespaceId /*ns_id*/, const PageIds & page_ids, c return page_map; } -PageIds PageStorage::readImpl(NamespaceId /*ns_id*/, const PageIds & page_ids, const PageHandler & handler, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot, bool throw_on_not_exist) -{ - if (!snapshot) - { - snapshot = this->getSnapshot(""); - } - - if (!throw_on_not_exist) - { - throw Exception("Not support throw_on_not_exist on V2", ErrorCodes::NOT_IMPLEMENTED); - } - - std::map> file_read_infos; - for (auto page_id : page_ids) - { - const auto page_entry = toConcreteSnapshot(snapshot)->version()->find(page_id); - if (!page_entry) - throw Exception(fmt::format("Page {} not found", page_id), ErrorCodes::LOGICAL_ERROR); - auto file_id_level = page_entry->fileIdLevel(); - auto & [page_id_and_entries, file_reader] = file_read_infos[file_id_level]; - page_id_and_entries.emplace_back(page_id, *page_entry); - if (file_reader == nullptr) - { - try - { - file_reader = getReader(file_id_level); - } - catch (DB::Exception & e) - { - e.addMessage(fmt::format("(while reading Page[{}] of {})", page_id, storage_name)); - throw; - } - } - } - - for (auto & [file_id_level, entries_and_reader] : file_read_infos) - { - (void)file_id_level; - auto & page_id_and_entries = entries_and_reader.first; - auto & reader = entries_and_reader.second; - - reader->read(page_id_and_entries, handler, read_limiter); - } - - return {}; -} - PageMap PageStorage::readImpl(NamespaceId /*ns_id*/, const std::vector & page_fields, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot, bool throw_on_not_exist) { if (!snapshot) diff --git a/dbms/src/Storages/Page/V2/PageStorage.h b/dbms/src/Storages/Page/V2/PageStorage.h index ffdd5028a1c..669e2efb629 100644 --- a/dbms/src/Storages/Page/V2/PageStorage.h +++ b/dbms/src/Storages/Page/V2/PageStorage.h @@ -120,8 +120,6 @@ class PageStorage : public DB::PageStorage PageMap readImpl(NamespaceId ns_id, const PageIds & page_ids, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot, bool throw_on_not_exist) override; - PageIds readImpl(NamespaceId ns_id, const PageIds & page_ids, const PageHandler & handler, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot, bool throw_on_not_exist) override; - PageMap readImpl(NamespaceId ns_id, const std::vector & page_fields, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot, bool throw_on_not_exist) override; DB::Page readImpl(NamespaceId ns_id, const PageReadFields & page_field, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot, bool throw_on_not_exist) override; @@ -209,7 +207,6 @@ class PageStorage : public DB::PageStorage DB::Page read(PageId page_id, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot) { return readImpl(TEST_NAMESPACE_ID, page_id, read_limiter, snapshot, true); } PageMap read(const PageIds & page_ids) { return readImpl(TEST_NAMESPACE_ID, page_ids, nullptr, nullptr, true); } PageMap read(const PageIds & page_ids, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot) { return readImpl(TEST_NAMESPACE_ID, page_ids, read_limiter, snapshot, true); }; - PageIds read(const PageIds & page_ids, const PageHandler & handler) { return readImpl(TEST_NAMESPACE_ID, page_ids, handler, nullptr, nullptr, true); } PageMap read(const std::vector & page_fields) { return readImpl(TEST_NAMESPACE_ID, page_fields, nullptr, nullptr, true); } void traverse(const std::function & acceptor) { return traverseImpl(acceptor, nullptr); } bool gc() { return gcImpl(false, nullptr, nullptr); } diff --git a/dbms/src/Storages/Page/V2/tests/gtest_page_storage_multi_writers.cpp b/dbms/src/Storages/Page/V2/tests/gtest_page_storage_multi_writers.cpp index 58aca9a177b..63a750e1c23 100644 --- a/dbms/src/Storages/Page/V2/tests/gtest_page_storage_multi_writers.cpp +++ b/dbms/src/Storages/Page/V2/tests/gtest_page_storage_multi_writers.cpp @@ -224,16 +224,16 @@ class PSReader : public Poco::Runnable LOG_TRACE(&Poco::Logger::get("root"), e.displayText()); } #else - PageIds pageIds; + PageIds page_ids; for (size_t i = 0; i < 5; ++i) { - pageIds.emplace_back(random() % ctx.MAX_PAGE_ID); + page_ids.emplace_back(random() % ctx.MAX_PAGE_ID); } try { - // std::function; - PageHandler handler = [&](PageId page_id, const Page & page) { - (void)page_id; + auto page_map = storage->read(page_ids); + for (const auto & page : page_map) + { // use `sleep` to mock heavy read if (heavy_read_delay_ms > 0) { @@ -241,9 +241,8 @@ class PSReader : public Poco::Runnable usleep(heavy_read_delay_ms * 1000); } ++pages_read; - bytes_read += page.data.size(); - }; - storage->read(pageIds, handler); + bytes_read += page.second.data.size(); + } } catch (DB::Exception & e) { diff --git a/dbms/src/Storages/Page/V3/BlobStore.cpp b/dbms/src/Storages/Page/V3/BlobStore.cpp index 6dee4f0b088..52062c68df0 100644 --- a/dbms/src/Storages/Page/V3/BlobStore.cpp +++ b/dbms/src/Storages/Page/V3/BlobStore.cpp @@ -526,73 +526,6 @@ void BlobStore::removePosFromStats(BlobFileId blob_id, BlobFileOffset offset, si } } -void BlobStore::read(PageIDAndEntriesV3 & entries, const PageHandler & handler, const ReadLimiterPtr & read_limiter) -{ - if (entries.empty()) - { - return; - } - - ProfileEvents::increment(ProfileEvents::PSMReadPages, entries.size()); - - // Sort in ascending order by offset in file. - std::sort(entries.begin(), entries.end(), [](const auto & a, const auto & b) { - return a.second.offset < b.second.offset; - }); - - // allocate data_buf that can hold all pages - size_t buf_size = 0; - for (const auto & p : entries) - buf_size = std::max(buf_size, p.second.size); - - // When we read `WriteBatch` which is `WriteType::PUT_EXTERNAL`. - // The `buf_size` will be 0, we need avoid calling malloc/free with size 0. - if (buf_size == 0) - { - for (const auto & [page_id_v3, entry] : entries) - { - (void)entry; - LOG_DEBUG(log, "Read entry [page_id={}] without entry size.", page_id_v3); - Page page(page_id_v3); - handler(page_id_v3.low, page); - } - return; - } - - char * data_buf = static_cast(alloc(buf_size)); - MemHolder mem_holder = createMemHolder(data_buf, [&, buf_size](char * p) { - free(p, buf_size); - }); - - for (const auto & [page_id_v3, entry] : entries) - { - auto blob_file = read(page_id_v3, entry.file_id, entry.offset, data_buf, entry.size, read_limiter); - - if constexpr (BLOBSTORE_CHECKSUM_ON_READ) - { - ChecksumClass digest; - digest.update(data_buf, entry.size); - auto checksum = digest.checksum(); - if (unlikely(entry.size != 0 && checksum != entry.checksum)) - { - throw Exception( - fmt::format("Reading with entries meet checksum not match [page_id={}] [expected=0x{:X}] [actual=0x{:X}] [entry={}] [file={}]", - page_id_v3, - entry.checksum, - checksum, - toDebugString(entry), - blob_file->getPath()), - ErrorCodes::CHECKSUM_DOESNT_MATCH); - } - } - - Page page(page_id_v3); - page.data = ByteBuffer(data_buf, data_buf + entry.size); - page.mem_holder = mem_holder; - handler(page_id_v3.low, page); - } -} - PageMap BlobStore::read(FieldReadInfos & to_read, const ReadLimiterPtr & read_limiter) { if (to_read.empty()) diff --git a/dbms/src/Storages/Page/V3/BlobStore.h b/dbms/src/Storages/Page/V3/BlobStore.h index c0357d353ab..d33377d26ce 100644 --- a/dbms/src/Storages/Page/V3/BlobStore.h +++ b/dbms/src/Storages/Page/V3/BlobStore.h @@ -68,8 +68,6 @@ class BlobStore : private Allocator Page read(const PageIDAndEntryV3 & entry, const ReadLimiterPtr & read_limiter = nullptr); - void read(PageIDAndEntriesV3 & entries, const PageHandler & handler, const ReadLimiterPtr & read_limiter = nullptr); - struct FieldReadInfo { PageIdV3Internal page_id; diff --git a/dbms/src/Storages/Page/V3/PageDirectory.cpp b/dbms/src/Storages/Page/V3/PageDirectory.cpp index 2cf551e47f1..ea21000443b 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectory.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -35,6 +36,7 @@ #include #include + #ifdef FIU_ENABLE #include @@ -52,6 +54,7 @@ namespace DB namespace FailPoints { extern const char random_slow_page_storage_remove_expired_snapshots[]; +extern const char pause_before_full_gc_prepare[]; } // namespace FailPoints namespace ErrorCodes @@ -1209,10 +1212,13 @@ void PageDirectory::applyRefEditRecord( resolved_ver)); } + SYNC_FOR("before_PageDirectory::applyRefEditRecord_create_ref"); + // use the resolved_id to collapse ref chain 3->2, 2->1 ==> 3->1 bool is_ref_created = version_list->createNewRef(version, resolved_id); if (is_ref_created) { + SYNC_FOR("before_PageDirectory::applyRefEditRecord_incr_ref_count"); // Add the ref-count of being-ref entry if (auto resolved_iter = mvcc_table_directory.find(resolved_id); resolved_iter != mvcc_table_directory.end()) { @@ -1229,6 +1235,7 @@ void PageDirectory::applyRefEditRecord( resolved_ver)); } } + SYNC_FOR("after_PageDirectory::applyRefEditRecord_incr_ref_count"); } void PageDirectory::apply(PageEntriesEdit && edit, const WriteLimiterPtr & write_limiter) @@ -1377,6 +1384,10 @@ PageDirectory::getEntriesByBlobIds(const std::vector & blob_ids) con // do scan on the version list without lock on `mvcc_table_directory`. auto page_id = iter->first; const auto & version_entries = iter->second; + fiu_do_on(FailPoints::pause_before_full_gc_prepare, { + if (page_id.low == 101) + SYNC_FOR("before_PageDirectory::getEntriesByBlobIds_id_101"); + }); auto single_page_size = version_entries->getEntriesByBlobIds(blob_id_set, page_id, blob_versioned_entries, ref_ids_maybe_rewrite); total_page_size += single_page_size; if (single_page_size != 0) diff --git a/dbms/src/Storages/Page/V3/PageStorageImpl.cpp b/dbms/src/Storages/Page/V3/PageStorageImpl.cpp index 38664aae5f0..3a1c8c1cf90 100644 --- a/dbms/src/Storages/Page/V3/PageStorageImpl.cpp +++ b/dbms/src/Storages/Page/V3/PageStorageImpl.cpp @@ -202,31 +202,6 @@ PageMap PageStorageImpl::readImpl(NamespaceId ns_id, const PageIds & page_ids, c } } -PageIds PageStorageImpl::readImpl(NamespaceId ns_id, const PageIds & page_ids, const PageHandler & handler, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot, bool throw_on_not_exist) -{ - if (!snapshot) - { - snapshot = this->getSnapshot(""); - } - - PageIdV3Internals page_id_v3s; - for (auto p_id : page_ids) - page_id_v3s.emplace_back(buildV3Id(ns_id, p_id)); - - if (throw_on_not_exist) - { - auto page_entries = page_directory->getByIDs(page_id_v3s, snapshot); - blob_store.read(page_entries, handler, read_limiter); - return {}; - } - else - { - auto [page_entries, page_ids_not_found] = page_directory->getByIDsOrNull(page_id_v3s, snapshot); - blob_store.read(page_entries, handler, read_limiter); - return page_ids_not_found; - } -} - PageMap PageStorageImpl::readImpl(NamespaceId ns_id, const std::vector & page_fields, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot, bool throw_on_not_exist) { if (!snapshot) diff --git a/dbms/src/Storages/Page/V3/PageStorageImpl.h b/dbms/src/Storages/Page/V3/PageStorageImpl.h index 0eefa04fe7c..09f1c86620b 100644 --- a/dbms/src/Storages/Page/V3/PageStorageImpl.h +++ b/dbms/src/Storages/Page/V3/PageStorageImpl.h @@ -66,8 +66,6 @@ class PageStorageImpl : public DB::PageStorage PageMap readImpl(NamespaceId ns_id, const PageIds & page_ids, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot, bool throw_on_not_exist) override; - PageIds readImpl(NamespaceId ns_id, const PageIds & page_ids, const PageHandler & handler, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot, bool throw_on_not_exist) override; - PageMap readImpl(NamespaceId ns_id, const std::vector & page_fields, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot, bool throw_on_not_exist) override; Page readImpl(NamespaceId ns_id, const PageReadFields & page_field, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot, bool throw_on_not_exist) override; @@ -91,7 +89,6 @@ class PageStorageImpl : public DB::PageStorage DB::PageEntry getEntry(PageId page_id) { return getEntryImpl(TEST_NAMESPACE_ID, page_id, nullptr); } DB::Page read(PageId page_id) { return readImpl(TEST_NAMESPACE_ID, page_id, nullptr, nullptr, true); } PageMap read(const PageIds & page_ids) { return readImpl(TEST_NAMESPACE_ID, page_ids, nullptr, nullptr, true); } - PageIds read(const PageIds & page_ids, const PageHandler & handler) { return readImpl(TEST_NAMESPACE_ID, page_ids, handler, nullptr, nullptr, true); } PageMap read(const std::vector & page_fields) { return readImpl(TEST_NAMESPACE_ID, page_fields, nullptr, nullptr, true); } // clang-format on #endif diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp index a2ca7f6ab26..ab67058d22c 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp @@ -407,33 +407,24 @@ try const auto & page2 = page_storage->readImpl(TEST_NAMESPACE_ID, 2, nullptr, nullptr, false); ASSERT_FALSE(page2.isValid()); - std::vector fields; - PageStorage::PageReadFields field1; - field1.first = 4; - field1.second = {0, 1, 2}; - fields.emplace_back(field1); - - PageStorage::PageReadFields field2; - field2.first = 6; - field2.second = {0, 1, 2}; - fields.emplace_back(field2); + std::vector fields{ + {4, {0, 1, 2}}, + {6, {0, 1, 2}}, + {2, {0, 1, 2}}, + {5, {0, 1, 2}}, + }; page_maps = page_storage->readImpl(TEST_NAMESPACE_ID, fields, nullptr, nullptr, false); ASSERT_EQ(page_maps[4].page_id, 4); - ASSERT_FALSE(page_maps[6].isValid()); - - PageIds page_ids_not_found = page_storage->readImpl( - TEST_NAMESPACE_ID, - page_ids, - [](PageId /*page_id*/, const Page & /*page*/) {}, - nullptr, - nullptr, - false); - - std::sort(page_ids_not_found.begin(), page_ids_not_found.end()); - ASSERT_EQ(page_ids_not_found.size(), 2); - ASSERT_EQ(page_ids_not_found[0], 2); - ASSERT_EQ(page_ids_not_found[1], 5); + ASSERT_EQ(page_maps[4].fieldSize(), 3); + ASSERT_EQ(page_maps[4].data.size(), 20 + 20 + 30); + // the invalid page ids in input param are returned with INVALID_ID + ASSERT_GT(page_maps.count(6), 0); + ASSERT_EQ(page_maps[6].isValid(), false); + ASSERT_GT(page_maps.count(2), 0); + ASSERT_EQ(page_maps[2].isValid(), false); + ASSERT_GT(page_maps.count(5), 0); + ASSERT_EQ(page_maps[5].isValid(), false); } } CATCH diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_storage_gc.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_storage_gc.cpp index 6e6e9bceba7..a04856c4254 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_storage_gc.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_storage_gc.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -32,7 +33,8 @@ namespace DB namespace FailPoints { extern const char force_ps_wal_compact[]; -} +extern const char pause_before_full_gc_prepare[]; +} // namespace FailPoints namespace PS::V3::tests { @@ -323,5 +325,158 @@ INSTANTIATE_TEST_CASE_P( return String(magic_enum::enum_name(param.param)); }); +/////// +/// PageStorageFullGCConcurrentTest2 +/////// + +// The full GC start timing when run concurrently +// with creating ref page +enum class StartTiming +{ + BeforeCreateRef = 1, + BeforeIncrRefCount, + AfterIncrRefCount, +}; +class PageStorageFullGCConcurrentTest2 + : public PageStorageTest + , public testing::WithParamInterface +{ +public: + PageStorageFullGCConcurrentTest2() + : timing(GetParam()) + { + } + + void SetUp() override + { + PageStorageTest::SetUp(); + } + + SyncPointScopeGuard getSyncPoint() const + { + switch (timing) + { + case StartTiming::BeforeCreateRef: + return SyncPointCtl::enableInScope("before_PageDirectory::applyRefEditRecord_create_ref"); + case StartTiming::BeforeIncrRefCount: + return SyncPointCtl::enableInScope("before_PageDirectory::applyRefEditRecord_incr_ref_count"); + case StartTiming::AfterIncrRefCount: + return SyncPointCtl::enableInScope("after_PageDirectory::applyRefEditRecord_incr_ref_count"); + } + } + +protected: + StartTiming timing; +}; + +TEST_P(PageStorageFullGCConcurrentTest2, CreateRefPage) +try +{ + // always pick all blob file for full gc + PageStorageConfig new_config; + new_config.blob_heavy_gc_valid_rate = 1.0; + page_storage->reloadSettings(new_config); + + PageId page_id1 = 101; + PageId ref_page_id2 = 102; + PageId ref_page_id3 = 103; + PageId ref_page_id4 = 104; + { + WriteBatch batch; + batch.putPage(page_id1, default_tag, getDefaultBuffer(), buf_sz); + batch.putRefPage(ref_page_id2, page_id1); + batch.delPage(page_id1); + page_storage->write(std::move(batch)); + } + { + WriteBatch batch; + batch.putRefPage(ref_page_id3, ref_page_id2); + batch.delPage(ref_page_id2); + page_storage->write(std::move(batch)); + } + + FailPointHelper::enableFailPoint(FailPoints::force_ps_wal_compact); + FailPointHelper::enableFailPoint(FailPoints::pause_before_full_gc_prepare); + auto sp_full_gc_prepare = SyncPointCtl::enableInScope("before_PageDirectory::getEntriesByBlobIds_id_101"); + auto th_full_gc = std::async([&]() { + // let's try full gc, it should rewrite the ref-pages into normal pages + auto done_full_gc = page_storage->gcImpl(/* not_skip */ true, nullptr, nullptr); + EXPECT_EQ(true, done_full_gc); + }); + // let's begin full gc and stop before collecting for id=101 + sp_full_gc_prepare.waitAndPause(); + + auto sp_foreground_write = getSyncPoint(); + auto th_foreground_write = std::async([&]() { + WriteBatch batch; + batch.putRefPage(ref_page_id4, ref_page_id3); + page_storage->write(std::move(batch)); + }); + // let's create the ref page with sync_point + sp_foreground_write.waitAndPause(); + + + // let's continue the full gc + sp_full_gc_prepare.next(); + // ... then continue the foreground write + sp_foreground_write.next(); + sp_full_gc_prepare.disable(); + sp_foreground_write.disable(); + + // finish + th_full_gc.get(); + th_foreground_write.get(); + + // wal compact again + page_storage->page_directory->tryDumpSnapshot(nullptr, nullptr, true); + + LOG_INFO(log, "close and restore WAL from disk"); + page_storage.reset(); + + // The living ref-pages (id3, id4) are rewrite into + // normal pages. + auto [wal, reader] = WALStore::create(String(NAME), file_provider, delegator, WALConfig::from(new_config)); + UNUSED(wal); + size_t num_entries_on_wal = 0; + bool exist_id3_normal_entry = false; + bool exist_id4_normal_entry = false; + while (reader->remained()) + { + auto s = reader->next(); + if (s.has_value()) + { + auto e = ser::deserializeFrom(s.value()); + num_entries_on_wal += e.size(); + for (const auto & r : e.getRecords()) + { + if (r.type == EditRecordType::VAR_ENTRY) + { + if (r.page_id.low == ref_page_id3) + exist_id3_normal_entry = true; + else if (r.page_id.low == ref_page_id4) + exist_id4_normal_entry = true; + } + LOG_INFO(log, PageEntriesEdit::toDebugString(r)); + } + } + } + ASSERT_TRUE(exist_id3_normal_entry); + ASSERT_TRUE(exist_id4_normal_entry); + ASSERT_EQ(num_entries_on_wal, 2); +} +CATCH + +INSTANTIATE_TEST_CASE_P( + StartTiming, + PageStorageFullGCConcurrentTest2, + ::testing::Values( + StartTiming::BeforeCreateRef, + StartTiming::BeforeIncrRefCount, + StartTiming::AfterIncrRefCount // + ), + [](const ::testing::TestParamInfo & param) { + return String(magic_enum::enum_name(param.param)); + }); + } // namespace PS::V3::tests } // namespace DB diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_storage_mix_mode.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_storage_mix_mode.cpp index 54aa26ca71f..d734bb759b9 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_storage_mix_mode.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_storage_mix_mode.cpp @@ -273,17 +273,6 @@ try ASSERT_PAGE_EQ(c_buff, buf_sz, page_maps[7], 7); } - { - PageIds page_ids = {1, 2, 3, 4}; - PageHandler hander = [](DB::PageId /*page_id*/, const Page & /*page*/) { - }; - ASSERT_NO_THROW(page_reader_mix->read(page_ids, hander)); - - // Read page ids which only exited in V2 - page_ids = {1, 2, 7}; - ASSERT_NO_THROW(page_reader_mix->read(page_ids, hander)); - } - { std::vector read_fields; read_fields.emplace_back(std::pair(2, {1, 3, 6})); diff --git a/dbms/src/Storages/Page/workload/PSRunnable.cpp b/dbms/src/Storages/Page/workload/PSRunnable.cpp index be9ae1756a9..01fd89c0377 100644 --- a/dbms/src/Storages/Page/workload/PSRunnable.cpp +++ b/dbms/src/Storages/Page/workload/PSRunnable.cpp @@ -257,17 +257,16 @@ bool PSReader::runImpl() { DB::PageIds page_ids = genRandomPageIds(); - DB::PageHandler handler = [&](DB::PageId page_id, const DB::Page & page) { - (void)page_id; - // use `sleep` to mock heavy read + auto page_map = ps->read(DB::TEST_NAMESPACE_ID, page_ids); + for (const auto & page : page_map) + { if (heavy_read_delay_ms > 0) { usleep(heavy_read_delay_ms * 1000); } ++pages_used; - bytes_used += page.data.size(); - }; - ps->read(DB::TEST_NAMESPACE_ID, page_ids, handler); + bytes_used += page.second.data.size(); + } return true; }