From 8044c1a1b0ab2a8a5c569831dbe332b05ddb0ec6 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Tue, 1 Nov 2022 13:50:44 +0800 Subject: [PATCH 01/22] Add ut f --- .../Page/V3/tests/gtest_page_storage.cpp | 61 +++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp index 9a63d28a126..8d60175d336 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp @@ -1754,6 +1754,67 @@ try } CATCH +TEST_F(PageStorageTest, FullGCAfterWALCompact) +try +{ + DB::UInt64 tag = 0; + const size_t buf_sz = 1024; + char c_buff[buf_sz]; + for (size_t i = 0; i < buf_sz; ++i) + { + c_buff[i] = i % 0xff; + } + + PageId page_id1 = 101; + { + WriteBatch batch; + batch.putPage(page_id1, tag, std::make_shared(c_buff, buf_sz), buf_sz); + batch.putPage(page_id1, tag, std::make_shared(c_buff, buf_sz), buf_sz); + batch.putPage(page_id1, tag, std::make_shared(c_buff, buf_sz), buf_sz); + batch.putPage(page_id1, tag, std::make_shared(c_buff, buf_sz), buf_sz); + page_storage->write(std::move(batch)); + } + // write until there are more than one wal file + PageId page_id2 = 102; + while (getLogFileNum() <= 1) + { + WriteBatch batch; + batch.putPage(page_id2, 0, std::make_shared(c_buff, buf_sz), buf_sz, {}); + page_storage->write(std::move(batch)); + } + + { + // acquire a snapshot to prevent page 1 from being removed + auto snap = page_storage->getSnapshot("t0"); + { + WriteBatch batch; + batch.delPage(page_id1); + batch.delPage(page_id2); + page_storage->write(std::move(batch)); + } + + // let's compact the WAL logs + auto done_snapshot = page_storage->page_directory->tryDumpSnapshot(nullptr, nullptr, /* force */ true); + ASSERT_TRUE(done_snapshot); + + // let's full gc, this will move page 1 and 2 to a new BlobFile + auto done_full_gc = page_storage->gcImpl(true, nullptr, nullptr); + ASSERT_TRUE(done_full_gc); + + snap.reset(); + } + + LOG_INFO(log, "gc after snapshot released"); + // free snap then page 1 and 2 are removed from disk + page_storage->gcImpl(true, nullptr, nullptr); + + LOG_INFO(log, "restore from disk"); + page_storage.reset(); + page_storage = reopenWithConfig(config); + page_storage->read(page_id1); +} +CATCH + TEST_F(PageStorageTest, ReloadConfig) try { From 3b790ab9115595aa1578b1b20681c5256024f890 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Tue, 1 Nov 2022 18:40:21 +0800 Subject: [PATCH 02/22] Only full gc when id is valid --- dbms/src/Storages/Page/V3/PageDirectory.cpp | 59 ++++++++++++++------- 1 file changed, 41 insertions(+), 18 deletions(-) diff --git a/dbms/src/Storages/Page/V3/PageDirectory.cpp b/dbms/src/Storages/Page/V3/PageDirectory.cpp index 46a719c2f84..7898d26d97c 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectory.cpp @@ -468,32 +468,55 @@ PageSize VersionedPageEntries::getEntriesByBlobIds( PageIdV3Internal page_id, std::map & blob_versioned_entries) { + // `blob_versioned_entries`: // blob_file_0, [, - // , // ] // blob_file_1, [...] // ... - // the total entries size taken out - PageSize total_entries_size = 0; + auto page_lock = acquireLock(); - if (type == EditRecordType::VAR_ENTRY) + if (type != EditRecordType::VAR_ENTRY) + return 0; + + assert(type == EditRecordType::VAR_ENTRY); + // ignore the last delete + bool exist_delete = false; + auto iter = entries.rbegin(); + while (iter != entries.rend()) { - for (const auto & [ver, entry_or_del] : entries) - { - if (!entry_or_del.isEntry()) - { - continue; - } + if (iter->second.isEntry()) + break; + assert(iter->second.isDelete()); + exist_delete = true; + ++iter; + } + if (iter == entries.rend()) + return 0; - const auto & entry = entry_or_del.entry; - if (blob_ids.count(entry.file_id) > 0) - { - blob_versioned_entries[entry.file_id].emplace_back(page_id, ver, entry); - total_entries_size += entry.size; - } - } + assert(iter->second.isEntry()); + const auto & last_entry = iter->second; + bool need_full_gc = false; + if (last_entry.being_ref_count == 1) + { + // This PageEntry is not shared by any other page_id, + need_full_gc = !exist_delete; + } + else + { + // This PageEntry is shared by another page_id, we + // need to move the latest entry to another BlobFile + // in order to reduce space amplification. + need_full_gc = true; + } + + // The total entries size that will be moved + PageSize entry_size_full_gc = 0; + if (need_full_gc && blob_ids.count(last_entry.entry.file_id) > 0) + { + blob_versioned_entries[last_entry.entry.file_id].emplace_back(page_id, /* ver */ iter->first, last_entry.entry); + entry_size_full_gc += last_entry.entry.size; } - return total_entries_size; + return entry_size_full_gc; } bool VersionedPageEntries::cleanOutdatedEntries( From 1df49eef513a70a92c91cdfdcdeed89b8b2e01ae Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Wed, 2 Nov 2022 16:52:01 +0800 Subject: [PATCH 03/22] Add test case --- dbms/src/Common/FailPoint.cpp | 3 +- .../tests/gtest_encryption_test.cpp | 18 +- dbms/src/Storages/Page/V3/BlobStore.cpp | 7 +- dbms/src/Storages/Page/V3/PageDirectory.cpp | 7 - dbms/src/Storages/Page/V3/PageStorageImpl.cpp | 13 +- .../Page/V3/tests/gtest_page_storage.cpp | 101 +------- .../Page/V3/tests/gtest_page_storage.h | 77 ++++++ .../Page/V3/tests/gtest_page_storage_gc.cpp | 245 ++++++++++++++++++ 8 files changed, 355 insertions(+), 116 deletions(-) create mode 100644 dbms/src/Storages/Page/V3/tests/gtest_page_storage.h create mode 100644 dbms/src/Storages/Page/V3/tests/gtest_page_storage_gc.cpp diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index 3ebfdbe4358..8c2e838bf75 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -69,7 +69,8 @@ std::unordered_map> FailPointHelper::f M(exception_mpp_hash_build) \ M(exception_before_drop_segment) \ M(exception_after_drop_segment) \ - M(exception_between_schema_change_in_the_same_diff) + M(exception_between_schema_change_in_the_same_diff) \ + M(force_ps_wal_compact) #define APPLY_FOR_FAILPOINTS(M) \ M(skip_check_segment_update) \ diff --git a/dbms/src/Encryption/tests/gtest_encryption_test.cpp b/dbms/src/Encryption/tests/gtest_encryption_test.cpp index 2fa20631cbd..08f04f392b7 100644 --- a/dbms/src/Encryption/tests/gtest_encryption_test.cpp +++ b/dbms/src/Encryption/tests/gtest_encryption_test.cpp @@ -216,14 +216,24 @@ TEST_P(EncryptionTest, EncryptionTest) EXPECT_TRUE(testEncryption(16, 16 * 2, test::IV_OVERFLOW_FULL)); } +INSTANTIATE_TEST_CASE_P( + EncryptionTestInstance, + EncryptionTest, + testing::Combine( + testing::Bool(), + testing::Values( + EncryptionMethod::Aes128Ctr, + EncryptionMethod::Aes192Ctr, + EncryptionMethod::Aes256Ctr, #if USE_GM_SSL -INSTANTIATE_TEST_CASE_P(EncryptionTestInstance, EncryptionTest, testing::Combine(testing::Bool(), testing::Values(EncryptionMethod::Aes128Ctr, EncryptionMethod::Aes192Ctr, EncryptionMethod::Aes256Ctr, EncryptionMethod::SM4Ctr))); + EncryptionMethod::SM4Ctr #elif OPENSSL_VERSION_NUMBER < 0x1010100fL || defined(OPENSSL_NO_SM4) -INSTANTIATE_TEST_CASE_P(EncryptionTestInstance, EncryptionTest, testing::Combine(testing::Bool(), testing::Values(EncryptionMethod::Aes128Ctr, EncryptionMethod::Aes192Ctr, EncryptionMethod::Aes256Ctr))); +// not support SM4 #else -// Openssl support SM4 after 1.1.1 release version. -INSTANTIATE_TEST_CASE_P(EncryptionTestInstance, EncryptionTest, testing::Combine(testing::Bool(), testing::Values(EncryptionMethod::Aes128Ctr, EncryptionMethod::Aes192Ctr, EncryptionMethod::Aes256Ctr, EncryptionMethod::SM4Ctr))); + // Openssl support SM4 after 1.1.1 release version. + EncryptionMethod::SM4Ctr #endif + ))); TEST(PosixWritableFileTest, test) diff --git a/dbms/src/Storages/Page/V3/BlobStore.cpp b/dbms/src/Storages/Page/V3/BlobStore.cpp index d30ad6f3cb9..c7977761d6d 100644 --- a/dbms/src/Storages/Page/V3/BlobStore.cpp +++ b/dbms/src/Storages/Page/V3/BlobStore.cpp @@ -34,6 +34,7 @@ #include #include #include +#include "Common/formatReadable.h" namespace ProfileEvents { @@ -1005,7 +1006,7 @@ std::vector BlobStore::getGCStats() // Check if GC is required if (stat->sm_valid_rate <= config.heavy_gc_valid_rate) { - LOG_TRACE(log, "Current [blob_id={}] valid rate is {:.2f}, Need do compact GC", stat->id, stat->sm_valid_rate); + LOG_TRACE(log, "Current [blob_id={}] valid rate is {:.2f}, full GC", stat->id, stat->sm_valid_rate); blob_need_gc.emplace_back(stat->id); // Change current stat to read only @@ -1015,7 +1016,7 @@ std::vector BlobStore::getGCStats() else { blobstore_gc_info.appendToNoNeedGCBlob(stat->id, stat->sm_valid_rate); - LOG_TRACE(log, "Current [blob_id={}] valid rate is {:.2f}, No need to GC.", stat->id, stat->sm_valid_rate); + LOG_TRACE(log, "Current [blob_id={}] valid rate is {:.2f}, no need to GC", stat->id, stat->sm_valid_rate); } if (right_margin != stat->sm_total_size) @@ -1049,7 +1050,7 @@ PageEntriesEdit BlobStore::gc(std::map & { throw Exception("BlobStore can't do gc if nothing need gc.", ErrorCodes::LOGICAL_ERROR); } - LOG_INFO(log, "BlobStore gc will migrate {:.2f}MB into new Blobs", (1.0 * total_page_size / DB::MB)); + LOG_INFO(log, "BlobStore gc will migrate {} into new blob files", formatReadableSizeWithBinarySuffix(total_page_size)); auto write_blob = [this, total_page_size, &written_blobs, &write_limiter](const BlobFileId & file_id, char * data_begin, diff --git a/dbms/src/Storages/Page/V3/PageDirectory.cpp b/dbms/src/Storages/Page/V3/PageDirectory.cpp index 7898d26d97c..ae88d7f716f 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectory.cpp @@ -1301,13 +1301,6 @@ PageDirectory::getEntriesByBlobIds(const std::vector & blob_ids) con break; } } - for (const auto blob_id : blob_ids) - { - if (blob_versioned_entries.find(blob_id) == blob_versioned_entries.end()) - { - throw Exception(fmt::format("Can't get any entries from [blob_id={}]", blob_id)); - } - } LOG_INFO(log, "Get entries by blob ids done. [total_page_size={}] [total_page_nums={}]", // total_page_size, // diff --git a/dbms/src/Storages/Page/V3/PageStorageImpl.cpp b/dbms/src/Storages/Page/V3/PageStorageImpl.cpp index 04f875f28fa..a6c6e40864d 100644 --- a/dbms/src/Storages/Page/V3/PageStorageImpl.cpp +++ b/dbms/src/Storages/Page/V3/PageStorageImpl.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include #include #include @@ -35,6 +36,10 @@ namespace ErrorCodes { extern const int NOT_IMPLEMENTED; } // namespace ErrorCodes +namespace FailPoints +{ +extern const char force_ps_wal_compact[]; +} namespace PS::V3 { PageStorageImpl::PageStorageImpl( @@ -407,9 +412,13 @@ PageStorageImpl::GCTimeStatistics PageStorageImpl::doGC(const WriteLimiterPtr & GCTimeStatistics statistics; + // TODO: rewrite the GC process and split it into smaller interface + bool force_wal_compact = false; + fiu_do_on(FailPoints::force_ps_wal_compact, { force_wal_compact = true; }); + // 1. Do the MVCC gc, clean up expired snapshot. // And get the expired entries. - if (page_directory->tryDumpSnapshot(read_limiter, write_limiter)) + if (page_directory->tryDumpSnapshot(read_limiter, write_limiter, force_wal_compact)) { GET_METRIC(tiflash_storage_page_gc_count, type_v3_mvcc_dumped).Increment(); } @@ -418,6 +427,8 @@ PageStorageImpl::GCTimeStatistics PageStorageImpl::doGC(const WriteLimiterPtr & const auto & del_entries = page_directory->gcInMemEntries(); statistics.gc_in_mem_entries_ms = gc_watch.elapsedMillisecondsFromLastTime(); + SYNC_FOR("before_PageStorageImpl::doGC_fullGC"); + // 2. Remove the expired entries in BlobStore. // It won't delete the data on the disk. // It will only update the SpaceMap which in memory. diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp index 8d60175d336..7d1595a579f 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -49,45 +50,6 @@ extern const char force_set_page_file_write_errno[]; namespace PS::V3::tests { -class PageStorageTest : public DB::base::TiFlashStorageTestBasic -{ -public: - void SetUp() override - { - TiFlashStorageTestBasic::SetUp(); - log = Logger::get(); - auto path = getTemporaryPath(); - createIfNotExist(path); - file_provider = DB::tests::TiFlashTestEnv::getContext().getFileProvider(); - delegator = std::make_shared(path); - page_storage = std::make_shared("test.t", delegator, config, file_provider); - page_storage->restore(); - } - - std::shared_ptr reopenWithConfig(const PageStorageConfig & config_) - { - auto path = getTemporaryPath(); - delegator = std::make_shared(path); - auto storage = std::make_shared("test.t", delegator, config_, file_provider); - storage->restore(); - return storage; - } - - size_t getLogFileNum() - { - auto log_files = WALStoreReader::listAllFiles(delegator, log); - return log_files.size(); - } - -protected: - FileProviderPtr file_provider; - std::unique_ptr path_pool; - PSDiskDelegatorPtr delegator; - PageStorageConfig config; - std::shared_ptr page_storage; - - LoggerPtr log; -}; TEST_F(PageStorageTest, WriteRead) try @@ -1754,67 +1716,6 @@ try } CATCH -TEST_F(PageStorageTest, FullGCAfterWALCompact) -try -{ - DB::UInt64 tag = 0; - const size_t buf_sz = 1024; - char c_buff[buf_sz]; - for (size_t i = 0; i < buf_sz; ++i) - { - c_buff[i] = i % 0xff; - } - - PageId page_id1 = 101; - { - WriteBatch batch; - batch.putPage(page_id1, tag, std::make_shared(c_buff, buf_sz), buf_sz); - batch.putPage(page_id1, tag, std::make_shared(c_buff, buf_sz), buf_sz); - batch.putPage(page_id1, tag, std::make_shared(c_buff, buf_sz), buf_sz); - batch.putPage(page_id1, tag, std::make_shared(c_buff, buf_sz), buf_sz); - page_storage->write(std::move(batch)); - } - // write until there are more than one wal file - PageId page_id2 = 102; - while (getLogFileNum() <= 1) - { - WriteBatch batch; - batch.putPage(page_id2, 0, std::make_shared(c_buff, buf_sz), buf_sz, {}); - page_storage->write(std::move(batch)); - } - - { - // acquire a snapshot to prevent page 1 from being removed - auto snap = page_storage->getSnapshot("t0"); - { - WriteBatch batch; - batch.delPage(page_id1); - batch.delPage(page_id2); - page_storage->write(std::move(batch)); - } - - // let's compact the WAL logs - auto done_snapshot = page_storage->page_directory->tryDumpSnapshot(nullptr, nullptr, /* force */ true); - ASSERT_TRUE(done_snapshot); - - // let's full gc, this will move page 1 and 2 to a new BlobFile - auto done_full_gc = page_storage->gcImpl(true, nullptr, nullptr); - ASSERT_TRUE(done_full_gc); - - snap.reset(); - } - - LOG_INFO(log, "gc after snapshot released"); - // free snap then page 1 and 2 are removed from disk - page_storage->gcImpl(true, nullptr, nullptr); - - LOG_INFO(log, "restore from disk"); - page_storage.reset(); - page_storage = reopenWithConfig(config); - page_storage->read(page_id1); -} -CATCH - TEST_F(PageStorageTest, ReloadConfig) try { diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.h b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.h new file mode 100644 index 00000000000..1b197a2e160 --- /dev/null +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.h @@ -0,0 +1,77 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + +namespace DB +{ +namespace PS::V3::tests +{ +class PageStorageTest : public DB::base::TiFlashStorageTestBasic +{ +public: + void SetUp() override + { + TiFlashStorageTestBasic::SetUp(); + + for (size_t i = 0; i < buf_sz; ++i) + c_buff[i] = i % 0xff; + + log = Logger::get(); + auto path = getTemporaryPath(); + createIfNotExist(path); + file_provider = DB::tests::TiFlashTestEnv::getContext().getFileProvider(); + delegator = std::make_shared(path); + page_storage = std::make_shared(String(NAME), delegator, config, file_provider); + page_storage->restore(); + } + + std::shared_ptr reopenWithConfig(const PageStorageConfig & config_) + { + auto path = getTemporaryPath(); + delegator = std::make_shared(path); + auto storage = std::make_shared(String(NAME), delegator, config_, file_provider); + storage->restore(); + return storage; + } + + size_t getLogFileNum() + { + auto log_files = WALStoreReader::listAllFiles(delegator, log); + return log_files.size(); + } + + ReadBufferPtr getDefaultBuffer() const + { + return std::make_shared(c_buff, buf_sz); + } + +protected: + FileProviderPtr file_provider; + PSDiskDelegatorPtr delegator; + PageStorageConfig config; + std::shared_ptr page_storage; + + static constexpr std::string_view NAME{"test.t"}; + DB::UInt64 default_tag = 0; + constexpr static size_t buf_sz = 1024; + char c_buff[buf_sz]; + + LoggerPtr log; +}; +} // namespace PS::V3::tests +} // namespace DB diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_storage_gc.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_storage_gc.cpp new file mode 100644 index 00000000000..7a3cfda3f44 --- /dev/null +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_storage_gc.cpp @@ -0,0 +1,245 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace DB +{ +namespace FailPoints +{ +extern const char force_ps_wal_compact[]; +} +namespace PS::V3::tests +{ + +struct TestParam +{ + bool keep_snap; + bool has_ref; + + explicit TestParam(const std::tuple & t) + : keep_snap(std::get<0>(t)) + , has_ref(std::get<1>(t)) + { + } +}; + +class PageStorageFullGCTestWithParam + : public PageStorageTest + , public testing::WithParamInterface> +{ +public: + PageStorageFullGCTestWithParam() + : test_param(GetParam()) + { + } + + void SetUp() override + { + PageStorageTest::SetUp(); + } + +protected: + TestParam test_param; +}; + +TEST_P(PageStorageFullGCTestWithParam, DontMoveDeletedPageId) +try +{ + // always pick all blob file for full gc + PageStorageConfig new_config; + new_config.blob_heavy_gc_valid_rate = 1.0; + page_storage->reloadSettings(new_config); + + PageId page_id1 = 101; + PageId ref_page_id = 102; + { + WriteBatch batch; + batch.putPage(page_id1, default_tag, getDefaultBuffer(), buf_sz); + if (test_param.has_ref) + batch.putRefPage(ref_page_id, page_id1); + page_storage->write(std::move(batch)); + } + + // If the page id is logically delete before `tryDumpSnapshot`, then + // full gc won't rewrite + { + WriteBatch batch; + batch.delPage(page_id1); + if (test_param.has_ref) + batch.delPage(ref_page_id); + page_storage->write(std::move(batch)); + } + + { + PageStorageSnapshotPtr snap; + if (test_param.keep_snap) + snap = page_storage->getSnapshot(""); + + // let's compact the WAL logs + auto done_snapshot = page_storage->page_directory->tryDumpSnapshot(nullptr, nullptr, /* force */ true); + ASSERT_TRUE(done_snapshot); + + // let's try full gc, this will not trigger full gc + auto done_full_gc = page_storage->gcImpl(true, nullptr, nullptr); + ASSERT_FALSE(done_full_gc); + } +} +CATCH + +INSTANTIATE_TEST_CASE_P( + Group, + PageStorageFullGCTestWithParam, + ::testing::Combine( + ::testing::Bool(), + ::testing::Bool())); + + +TEST_F(PageStorageTest, DeletePageIdAfterWALCompact) +try +{ + // always pick all blob file for full gc + PageStorageConfig new_config; + new_config.blob_heavy_gc_valid_rate = 1.0; + page_storage->reloadSettings(new_config); + + PageId page_id1 = 101; + { + WriteBatch batch; + batch.putPage(page_id1, default_tag, getDefaultBuffer(), buf_sz); + page_storage->write(std::move(batch)); + } + + FailPointHelper::enableFailPoint(FailPoints::force_ps_wal_compact); + auto sp_gc = SyncPointCtl::enableInScope("before_PageStorageImpl::doGC_fullGC"); + auto th_gc = std::async([&]() { + auto done_full_gc = page_storage->gcImpl(/* not_skip */ true, nullptr, nullptr); + ASSERT_FALSE(done_full_gc); + }); + // let's compact the WAL logs + sp_gc.waitAndPause(); + + // If page is logically delete between `tryDumpSnapshot` and `fullGC`, then + // we should able to handle the "upsert after delete" on disk + { + WriteBatch batch; + batch.delPage(page_id1); + page_storage->write(std::move(batch)); + } + + // let's try full gc, this will not trigger full gc + sp_gc.next(); + th_gc.get(); + + // wal compact again + page_storage->page_directory->tryDumpSnapshot(nullptr, nullptr, true); + + LOG_INFO(log, "close and restore WAL from disk"); + page_storage.reset(); + + // There is no any entry on wal log-files + auto [wal, reader] = WALStore::create(String(NAME), file_provider, delegator, WALConfig::from(new_config)); + UNUSED(wal); + size_t num_entries_on_wal = 0; + while (reader->remained()) + { + auto s = reader->next(); + if (s.has_value()) + { + auto e = ser::deserializeFrom(s.value()); + num_entries_on_wal += e.size(); + EXPECT_TRUE(e.empty()); + } + } + ASSERT_EQ(num_entries_on_wal, 0); +} +CATCH + +TEST_F(PageStorageTest, DeleteRefPageIdAfterWALCompact) +try +{ + // always pick all blob file for full gc + PageStorageConfig new_config; + new_config.blob_heavy_gc_valid_rate = 1.0; + page_storage->reloadSettings(new_config); + + PageId page_id1 = 101; + PageId ref_page_id = 102; + { + WriteBatch batch; + batch.putPage(page_id1, default_tag, getDefaultBuffer(), buf_sz); + batch.putRefPage(ref_page_id, page_id1); + batch.delPage(page_id1); + page_storage->write(std::move(batch)); + } + + FailPointHelper::enableFailPoint(FailPoints::force_ps_wal_compact); + auto sp_gc = SyncPointCtl::enableInScope("before_PageStorageImpl::doGC_fullGC"); + auto th_gc = std::async([&]() { + auto done_full_gc = page_storage->gcImpl(/* not_skip */ true, nullptr, nullptr); + ASSERT_TRUE(done_full_gc); // note that full gc is executed + }); + // let's compact the WAL logs + sp_gc.waitAndPause(); + + // If page is logically delete between `tryDumpSnapshot` and `fullGC`, then + // we should able to handle the "upsert after delete" on disk + { + WriteBatch batch; + batch.delPage(ref_page_id); + page_storage->write(std::move(batch)); + } + + // let's try full gc, unlike `DeletePageIdAfterWALCompact`, + // this ** will ** trigger full gc + sp_gc.next(); + th_gc.get(); + + // wal compact again !!!! FIXME: this will throw exception + page_storage->page_directory->tryDumpSnapshot(nullptr, nullptr, true); + + LOG_INFO(log, "close and restore WAL from disk"); + page_storage.reset(); + + // There is no any entry on wal log-files + auto [wal, reader] = WALStore::create(String(NAME), file_provider, delegator, WALConfig::from(new_config)); + UNUSED(wal); + size_t num_entries_on_wal = 0; + while (reader->remained()) + { + auto s = reader->next(); + if (s.has_value()) + { + auto e = ser::deserializeFrom(s.value()); + num_entries_on_wal += e.size(); + EXPECT_TRUE(e.empty()); + } + } + ASSERT_EQ(num_entries_on_wal, 0); +} +CATCH + +} // namespace PS::V3::tests +} // namespace DB From 396f489b9fd12c34bf3f1ec4f5dcb8ce31b42819 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Wed, 2 Nov 2022 18:11:41 +0800 Subject: [PATCH 04/22] getEntriesByBlobIds support rewrite ref-pages --- dbms/src/Storages/Page/V3/PageDirectory.cpp | 116 ++++++++++-------- dbms/src/Storages/Page/V3/PageDirectory.h | 5 +- .../Storages/Page/V3/PageDirectoryFactory.cpp | 5 +- .../Page/V3/tests/gtest_versioned_entries.cpp | 24 ++-- 4 files changed, 89 insertions(+), 61 deletions(-) diff --git a/dbms/src/Storages/Page/V3/PageDirectory.cpp b/dbms/src/Storages/Page/V3/PageDirectory.cpp index ae88d7f716f..1ae5a5ce295 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectory.cpp @@ -374,13 +374,15 @@ std::optional VersionedPageEntries::getEntry(UInt64 seq) const return std::nullopt; } -std::optional VersionedPageEntries::getLastEntry() const +std::optional VersionedPageEntries::getLastEntry(std::optional seq) const { auto page_lock = acquireLock(); if (type == EditRecordType::VAR_ENTRY) { for (auto it_r = entries.rbegin(); it_r != entries.rend(); it_r++) { + if (seq.has_value() && it_r->first.sequence > seq.value()) + continue; if (it_r->second.isEntry()) { return it_r->second.entry; @@ -466,7 +468,8 @@ Int64 VersionedPageEntries::incrRefCount(const PageVersion & ver) PageSize VersionedPageEntries::getEntriesByBlobIds( const std::unordered_set & blob_ids, PageIdV3Internal page_id, - std::map & blob_versioned_entries) + std::map & blob_versioned_entries, + std::map> & ref_ids_maybe_rewrite) { // `blob_versioned_entries`: // blob_file_0, [, @@ -475,43 +478,31 @@ PageSize VersionedPageEntries::getEntriesByBlobIds( // ... auto page_lock = acquireLock(); + if (type == EditRecordType::VAR_REF) + { + if (is_deleted) + { + ref_ids_maybe_rewrite[page_id] = {ori_page_id, create_ver}; + } + return 0; + } + if (type != EditRecordType::VAR_ENTRY) return 0; assert(type == EditRecordType::VAR_ENTRY); - // ignore the last delete - bool exist_delete = false; + // Empty or already deleted + if (entries.empty()) + return 0; auto iter = entries.rbegin(); - while (iter != entries.rend()) - { - if (iter->second.isEntry()) - break; - assert(iter->second.isDelete()); - exist_delete = true; - ++iter; - } - if (iter == entries.rend()) + if (iter->second.isDelete()) return 0; assert(iter->second.isEntry()); - const auto & last_entry = iter->second; - bool need_full_gc = false; - if (last_entry.being_ref_count == 1) - { - // This PageEntry is not shared by any other page_id, - need_full_gc = !exist_delete; - } - else - { - // This PageEntry is shared by another page_id, we - // need to move the latest entry to another BlobFile - // in order to reduce space amplification. - need_full_gc = true; - } - // The total entries size that will be moved PageSize entry_size_full_gc = 0; - if (need_full_gc && blob_ids.count(last_entry.entry.file_id) > 0) + const auto & last_entry = iter->second; + if (blob_ids.count(last_entry.entry.file_id) > 0) { blob_versioned_entries[last_entry.entry.file_id].emplace_back(page_id, /* ver */ iter->first, last_entry.entry); entry_size_full_gc += last_entry.entry.size; @@ -1271,34 +1262,61 @@ PageDirectory::getEntriesByBlobIds(const std::vector & blob_ids) con std::map blob_versioned_entries; PageSize total_page_size = 0; + UInt64 total_page_nums = 0; + std::map> ref_ids_maybe_rewrite; - MVCCMapType::const_iterator iter; { - std::shared_lock read_lock(table_rw_mutex); - iter = mvcc_table_directory.cbegin(); - if (iter == mvcc_table_directory.end()) - return {blob_versioned_entries, total_page_size}; - } + MVCCMapType::const_iterator iter; + { + std::shared_lock read_lock(table_rw_mutex); + iter = mvcc_table_directory.cbegin(); + if (iter == mvcc_table_directory.end()) + return {blob_versioned_entries, total_page_size}; + } - UInt64 total_page_nums = 0; - while (true) - { - // `iter` is an iter that won't be invalid cause by `apply`/`gcApply`. - // do scan on the version list without lock on `mvcc_table_directory`. - auto page_id = iter->first; - const auto & version_entries = iter->second; - auto single_page_size = version_entries->getEntriesByBlobIds(blob_id_set, page_id, blob_versioned_entries); - total_page_size += single_page_size; - if (single_page_size != 0) + while (true) { - total_page_nums++; + // `iter` is an iter that won't be invalid cause by `apply`/`gcApply`. + // do scan on the version list without lock on `mvcc_table_directory`. + auto page_id = iter->first; + const auto & version_entries = iter->second; + auto single_page_size = version_entries->getEntriesByBlobIds(blob_id_set, page_id, blob_versioned_entries, ref_ids_maybe_rewrite); + total_page_size += single_page_size; + if (single_page_size != 0) + { + total_page_nums++; + } + + { + std::shared_lock read_lock(table_rw_mutex); + iter++; + if (iter == mvcc_table_directory.end()) + break; + } } + } + for (const auto & [ref_id, ori_id_ver] : ref_ids_maybe_rewrite) + { + const auto ori_id = std::get<0>(ori_id_ver); + const auto ver = std::get<1>(ori_id_ver); + MVCCMapType::const_iterator page_iter; { std::shared_lock read_lock(table_rw_mutex); - iter++; - if (iter == mvcc_table_directory.end()) - break; + page_iter = mvcc_table_directory.find(ori_id); + RUNTIME_CHECK(page_iter != mvcc_table_directory.end(), ref_id, ori_id, ver); + } + const auto & version_entries = page_iter->second; + // the latest entry with version.seq <= ref_id.create_ver.seq + auto entry = version_entries->getLastEntry(ver.sequence); + RUNTIME_CHECK(entry.has_value(), ref_id, ori_id, ver); + // If the being-ref entry lays on the full gc candidate blobfiles, then we + // need to rewrite the ref-id to a normal page. + if (blob_id_set.count(entry->file_id) > 0) + { + blob_versioned_entries[entry->file_id].emplace_back(ref_id, ver, *entry); + total_page_size += entry->size; + total_page_nums += 1; } } diff --git a/dbms/src/Storages/Page/V3/PageDirectory.h b/dbms/src/Storages/Page/V3/PageDirectory.h index c8f6864aeed..0393d48709b 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.h +++ b/dbms/src/Storages/Page/V3/PageDirectory.h @@ -177,7 +177,7 @@ class VersionedPageEntries std::optional getEntry(UInt64 seq) const; - std::optional getLastEntry() const; + std::optional getLastEntry(std::optional seq) const; bool isVisible(UInt64 seq) const; @@ -189,7 +189,8 @@ class VersionedPageEntries PageSize getEntriesByBlobIds( const std::unordered_set & blob_ids, PageIdV3Internal page_id, - std::map & blob_versioned_entries); + std::map & blob_versioned_entries, + std::map> & ref_ids_maybe_rewrite); /** * Given a `lowest_seq`, this will clean all outdated entries before `lowest_seq`. diff --git a/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp b/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp index 307d732ce79..c56fb5afc2b 100644 --- a/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp @@ -20,6 +20,7 @@ #include #include +#include namespace DB { @@ -62,7 +63,7 @@ PageDirectoryPtr PageDirectoryFactory::createFromReader(String storage_name, WAL // We should restore the entry to `blob_stats` even if it is marked as "deleted", // or we will mistakenly reuse the space to write other blobs down into that space. // So we need to use `getLastEntry` instead of `getEntry(version)` here. - if (auto entry = entries->getLastEntry(); entry) + if (auto entry = entries->getLastEntry(std::nullopt); entry) { blob_stats->restoreByEntry(*entry); } @@ -102,7 +103,7 @@ PageDirectoryPtr PageDirectoryFactory::createFromEdit(String storage_name, FileP // We should restore the entry to `blob_stats` even if it is marked as "deleted", // or we will mistakenly reuse the space to write other blobs down into that space. // So we need to use `getLastEntry` instead of `getEntry(version)` here. - if (auto entry = entries->getLastEntry(); entry) + if (auto entry = entries->getLastEntry(std::nullopt); entry) { blob_stats->restoreByEntry(*entry); } diff --git a/dbms/src/Storages/Page/V3/tests/gtest_versioned_entries.cpp b/dbms/src/Storages/Page/V3/tests/gtest_versioned_entries.cpp index b95941153e0..a4225f87550 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_versioned_entries.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_versioned_entries.cpp @@ -578,7 +578,8 @@ TEST_F(VersionedEntriesTest, getEntriesByBlobId) { std::map blob_entries; - PageSize total_size = entries.getEntriesByBlobIds({/*empty*/}, buildV3Id(TEST_NAMESPACE_ID, page_id), blob_entries); + std::map> rewrite; + PageSize total_size = entries.getEntriesByBlobIds({/*empty*/}, buildV3Id(TEST_NAMESPACE_ID, page_id), blob_entries, rewrite); ASSERT_EQ(blob_entries.size(), 0); ASSERT_EQ(total_size, 0); @@ -586,8 +587,9 @@ TEST_F(VersionedEntriesTest, getEntriesByBlobId) { std::map blob_entries; + std::map> rewrite; const BlobFileId blob_id = 1; - PageSize total_size = entries.getEntriesByBlobIds({blob_id}, buildV3Id(TEST_NAMESPACE_ID, page_id), blob_entries); + PageSize total_size = entries.getEntriesByBlobIds({blob_id}, buildV3Id(TEST_NAMESPACE_ID, page_id), blob_entries, rewrite); ASSERT_EQ(blob_entries.size(), 1); ASSERT_EQ(blob_entries[blob_id].size(), 4); @@ -597,8 +599,9 @@ TEST_F(VersionedEntriesTest, getEntriesByBlobId) { std::map blob_entries; + std::map> rewrite; const BlobFileId blob_id = 2; - PageSize total_size = entries.getEntriesByBlobIds({blob_id}, buildV3Id(TEST_NAMESPACE_ID, page_id), blob_entries); + PageSize total_size = entries.getEntriesByBlobIds({blob_id}, buildV3Id(TEST_NAMESPACE_ID, page_id), blob_entries, rewrite); ASSERT_EQ(blob_entries.size(), 1); ASSERT_EQ(blob_entries[blob_id].size(), 2); @@ -608,8 +611,9 @@ TEST_F(VersionedEntriesTest, getEntriesByBlobId) { std::map blob_entries; + std::map> rewrite; const BlobFileId blob_id = 3; - PageSize total_size = entries.getEntriesByBlobIds({blob_id}, buildV3Id(TEST_NAMESPACE_ID, page_id), blob_entries); + PageSize total_size = entries.getEntriesByBlobIds({blob_id}, buildV3Id(TEST_NAMESPACE_ID, page_id), blob_entries, rewrite); ASSERT_EQ(blob_entries.size(), 1); ASSERT_EQ(blob_entries[blob_id].size(), 2); @@ -620,7 +624,8 @@ TEST_F(VersionedEntriesTest, getEntriesByBlobId) // {1, 2} { std::map blob_entries; - PageSize total_size = entries.getEntriesByBlobIds({1, 2}, buildV3Id(TEST_NAMESPACE_ID, page_id), blob_entries); + std::map> rewrite; + PageSize total_size = entries.getEntriesByBlobIds({1, 2}, buildV3Id(TEST_NAMESPACE_ID, page_id), blob_entries, rewrite); ASSERT_EQ(blob_entries.size(), 2); ASSERT_EQ(blob_entries[1].size(), 4); @@ -633,7 +638,8 @@ TEST_F(VersionedEntriesTest, getEntriesByBlobId) // {2, 3} { std::map blob_entries; - PageSize total_size = entries.getEntriesByBlobIds({3, 2}, buildV3Id(TEST_NAMESPACE_ID, page_id), blob_entries); + std::map> rewrite; + PageSize total_size = entries.getEntriesByBlobIds({3, 2}, buildV3Id(TEST_NAMESPACE_ID, page_id), blob_entries, rewrite); ASSERT_EQ(blob_entries.size(), 2); ASSERT_EQ(blob_entries[2].size(), 2); @@ -646,7 +652,8 @@ TEST_F(VersionedEntriesTest, getEntriesByBlobId) // {1, 2, 3} { std::map blob_entries; - PageSize total_size = entries.getEntriesByBlobIds({1, 3, 2}, buildV3Id(TEST_NAMESPACE_ID, page_id), blob_entries); + std::map> rewrite; + PageSize total_size = entries.getEntriesByBlobIds({1, 3, 2}, buildV3Id(TEST_NAMESPACE_ID, page_id), blob_entries, rewrite); ASSERT_EQ(blob_entries.size(), 3); ASSERT_EQ(blob_entries[1].size(), 4); @@ -661,7 +668,8 @@ TEST_F(VersionedEntriesTest, getEntriesByBlobId) // {1, 2, 3, 100}; blob_id 100 is not exist in actual { std::map blob_entries; - PageSize total_size = entries.getEntriesByBlobIds({1, 3, 2, 4}, buildV3Id(TEST_NAMESPACE_ID, page_id), blob_entries); + std::map> rewrite; + PageSize total_size = entries.getEntriesByBlobIds({1, 3, 2, 4}, buildV3Id(TEST_NAMESPACE_ID, page_id), blob_entries, rewrite); ASSERT_EQ(blob_entries.size(), 3); // 100 not exist ASSERT_EQ(blob_entries.find(100), blob_entries.end()); From d883ea1a2da5ca03d5032918696eef45ec6c40e4 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Wed, 2 Nov 2022 18:39:20 +0800 Subject: [PATCH 05/22] Apply rewrite ref-page to normal page --- dbms/src/Storages/Page/V3/PageDirectory.cpp | 23 +++++++++++++++++++++ dbms/src/Storages/Page/V3/PageDirectory.h | 2 +- 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/dbms/src/Storages/Page/V3/PageDirectory.cpp b/dbms/src/Storages/Page/V3/PageDirectory.cpp index 1ae5a5ce295..385386f75e5 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectory.cpp @@ -108,6 +108,29 @@ void VersionedPageEntries::createNewEntry(const PageVersion & ver, const PageEnt return; } + if (type == EditRecordType::VAR_REF) + { + // an ref-page is rewritten into a normal page + if (!is_deleted) + { + // Full gc commit, we need to rewrite this page to + // be normal page with upsert-entry. + // TODO: Also we need to decrease the ref-count of ori_page_id. + entries.emplace(ver, EntryOrDelete::newNormalEntry(entry)); + } + else + { + // The ref-id is deleted before full gc commit, + // we need to rewrite this page to be normal page + // with upsert-entry and a delete. + entries.emplace(ver, EntryOrDelete::newNormalEntry(entry)); + entries.emplace(delete_ver, EntryOrDelete::newDelete()); + } + is_deleted = false; + type = EditRecordType::VAR_ENTRY; + return; + } + throw Exception( fmt::format("try to create entry version with invalid state " "[ver={}] [entry={}] [state={}]", diff --git a/dbms/src/Storages/Page/V3/PageDirectory.h b/dbms/src/Storages/Page/V3/PageDirectory.h index 0393d48709b..a147260c2da 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.h +++ b/dbms/src/Storages/Page/V3/PageDirectory.h @@ -57,7 +57,7 @@ class PageDirectorySnapshot : public DB::PageStorageSnapshot CurrentMetrics::add(CurrentMetrics::PSMVCCNumSnapshots); } - ~PageDirectorySnapshot() + ~PageDirectorySnapshot() override { CurrentMetrics::sub(CurrentMetrics::PSMVCCNumSnapshots); } From b94f9b1b6a851e53bb8b63b35d52e21d7bb7f9e1 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Wed, 2 Nov 2022 22:39:12 +0800 Subject: [PATCH 06/22] Decrease ref-count after apply upsert on ref-page --- dbms/src/Storages/Page/V3/BlobStore.cpp | 2 +- dbms/src/Storages/Page/V3/PageDirectory.cpp | 86 +++++++++++++++++-- dbms/src/Storages/Page/V3/PageDirectory.h | 15 +++- .../Storages/Page/V3/PageDirectoryFactory.cpp | 11 ++- .../Page/V3/tests/gtest_versioned_entries.cpp | 2 +- 5 files changed, 101 insertions(+), 15 deletions(-) diff --git a/dbms/src/Storages/Page/V3/BlobStore.cpp b/dbms/src/Storages/Page/V3/BlobStore.cpp index c7977761d6d..9859696bd65 100644 --- a/dbms/src/Storages/Page/V3/BlobStore.cpp +++ b/dbms/src/Storages/Page/V3/BlobStore.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -34,7 +35,6 @@ #include #include #include -#include "Common/formatReadable.h" namespace ProfileEvents { diff --git a/dbms/src/Storages/Page/V3/PageDirectory.cpp b/dbms/src/Storages/Page/V3/PageDirectory.cpp index 385386f75e5..4c85ed2643c 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectory.cpp @@ -31,6 +31,7 @@ #include #include #include +#include #include #include @@ -73,6 +74,7 @@ void VersionedPageEntries::createNewEntry(const PageVersion & ver, const PageEnt if (type == EditRecordType::VAR_DELETE) { type = EditRecordType::VAR_ENTRY; + assert(entries.empty()); entries.emplace(ver, EntryOrDelete::newNormalEntry(entry)); return; } @@ -108,6 +110,59 @@ void VersionedPageEntries::createNewEntry(const PageVersion & ver, const PageEnt return; } + throw Exception( + fmt::format("try to create entry version with invalid state " + "[ver={}] [entry={}] [state={}]", + ver, + ::DB::PS::V3::toDebugString(entry), + toDebugString()), + ErrorCodes::PS_DIR_APPLY_INVALID_STATUS); +} + +PageIdV3Internal VersionedPageEntries::createUpsertEntry(const PageVersion & ver, const PageEntryV3 & entry) +{ + auto page_lock = acquireLock(); + + if (type == EditRecordType::VAR_DELETE) + { + type = EditRecordType::VAR_ENTRY; + assert(entries.empty()); + entries.emplace(ver, EntryOrDelete::newNormalEntry(entry)); + return buildV3Id(TEST_NAMESPACE_ID, INVALID_PAGE_ID); + } + + if (type == EditRecordType::VAR_ENTRY) + { + auto last_iter = MapUtils::findLess(entries, PageVersion(ver.sequence + 1, 0)); + if (last_iter == entries.end()) + { + entries.emplace(ver, EntryOrDelete::newNormalEntry(entry)); + } + else if (last_iter->second.isDelete()) + { + // append after delete + entries.emplace(ver, EntryOrDelete::newNormalEntry(entry)); + } + else + { + assert(last_iter->second.isEntry()); + // It is ok to replace the entry with same sequence and newer epoch, but not valid + // to replace the entry with newer sequence. + if (unlikely(last_iter->second.being_ref_count != 1 && last_iter->first.sequence < ver.sequence)) + { + throw Exception( + fmt::format("Try to replace normal entry with an newer seq [ver={}] [prev_ver={}] [last_entry={}]", + ver, + last_iter->first, + last_iter->second.toDebugString()), + ErrorCodes::LOGICAL_ERROR); + } + // create a new version that inherit the `being_ref_count` of the last entry + entries.emplace(ver, EntryOrDelete::newReplacingEntry(last_iter->second, entry)); + } + return buildV3Id(TEST_NAMESPACE_ID, INVALID_PAGE_ID); + } + if (type == EditRecordType::VAR_REF) { // an ref-page is rewritten into a normal page @@ -117,6 +172,9 @@ void VersionedPageEntries::createNewEntry(const PageVersion & ver, const PageEnt // be normal page with upsert-entry. // TODO: Also we need to decrease the ref-count of ori_page_id. entries.emplace(ver, EntryOrDelete::newNormalEntry(entry)); + is_deleted = false; + type = EditRecordType::VAR_ENTRY; + return ori_page_id; } else { @@ -125,14 +183,14 @@ void VersionedPageEntries::createNewEntry(const PageVersion & ver, const PageEnt // with upsert-entry and a delete. entries.emplace(ver, EntryOrDelete::newNormalEntry(entry)); entries.emplace(delete_ver, EntryOrDelete::newDelete()); + is_deleted = false; + type = EditRecordType::VAR_ENTRY; + return ori_page_id; } - is_deleted = false; - type = EditRecordType::VAR_ENTRY; - return; } throw Exception( - fmt::format("try to create entry version with invalid state " + fmt::format("try to create upsert entry version with invalid state " "[ver={}] [entry={}] [state={}]", ver, ::DB::PS::V3::toDebugString(entry), @@ -672,6 +730,8 @@ bool VersionedPageEntries::derefAndClean(UInt64 lowest_seq, PageIdV3Internal pag } iter->second.being_ref_count -= deref_count; + if (lowest_seq == 0) + return false; // Clean outdated entries after decreased the ref-counter // set `normal_entries_to_deref` to be nullptr to ignore cleaning ref-var-entries return cleanOutdatedEntries(lowest_seq, /*normal_entries_to_deref*/ nullptr, entries_removed, page_lock); @@ -1261,15 +1321,23 @@ void PageDirectory::gcApply(PageEntriesEdit && migrated_edit, const WriteLimiter { std::shared_lock read_lock(table_rw_mutex); iter = mvcc_table_directory.find(record.page_id); - if (unlikely(iter == mvcc_table_directory.end())) - { - throw Exception(fmt::format("Can't find [page_id={}] while doing gcApply", record.page_id), ErrorCodes::LOGICAL_ERROR); - } + RUNTIME_CHECK_MSG(iter != mvcc_table_directory.end(), "Can't find [page_id={}] while doing gcApply", record.page_id); } // release the read lock on `table_rw_mutex` // Append the gc version to version list const auto & versioned_entries = iter->second; - versioned_entries->createNewEntry(record.version, record.entry); + auto id_to_deref = versioned_entries->createUpsertEntry(record.version, record.entry); + if (id_to_deref.low != INVALID_PAGE_ID) + { + MVCCMapType::const_iterator deref_iter; + { + std::shared_lock read_lock(table_rw_mutex); + deref_iter = mvcc_table_directory.find(id_to_deref); + RUNTIME_CHECK_MSG(deref_iter != mvcc_table_directory.end(), "Can't find [page_id={}] to deref after gcApply", id_to_deref); + } + auto deref_res = deref_iter->second->derefAndClean(/*lowest_seq*/ 0, id_to_deref, record.version, 1, nullptr); + RUNTIME_ASSERT(!deref_res); + } } LOG_INFO(log, "GC apply done. [edit size={}]", migrated_edit.size()); diff --git a/dbms/src/Storages/Page/V3/PageDirectory.h b/dbms/src/Storages/Page/V3/PageDirectory.h index a147260c2da..610fccea2be 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.h +++ b/dbms/src/Storages/Page/V3/PageDirectory.h @@ -81,7 +81,7 @@ using PageDirectorySnapshotPtr = std::shared_ptr; struct EntryOrDelete { - bool is_delete; + bool is_delete = true; Int64 being_ref_count = 1; PageEntryV3 entry; @@ -162,6 +162,8 @@ class VersionedPageEntries void createNewEntry(const PageVersion & ver, const PageEntryV3 & entry); + PageIdV3Internal createUpsertEntry(const PageVersion & ver, const PageEntryV3 & entry); + bool createNewRef(const PageVersion & ver, PageIdV3Internal ori_page_id); std::shared_ptr createNewExternal(const PageVersion & ver); @@ -203,12 +205,19 @@ class VersionedPageEntries * * Return `true` iff this page can be totally removed from the whole `PageDirectory`. */ - bool cleanOutdatedEntries( + [[nodiscard]] bool cleanOutdatedEntries( UInt64 lowest_seq, std::map> * normal_entries_to_deref, PageEntriesV3 * entries_removed, const PageLock & page_lock); - bool derefAndClean( + /** + * Decrease the ref-count of entry with given `deref_ver`. + * If `lowest_seq` != 0, then it will run `cleanOutdatedEntries` after decreasing + * the ref-count. + * + * Return `true` iff this page can be totally removed from the whole `PageDirectory`. + */ + [[nodiscard]] bool derefAndClean( UInt64 lowest_seq, PageIdV3Internal page_id, const PageVersion & deref_ver, diff --git a/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp b/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp index c56fb5afc2b..84900eb332b 100644 --- a/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -185,9 +186,17 @@ void PageDirectoryFactory::applyRecord( restored_version); break; case EditRecordType::UPSERT: - version_list->createNewEntry(restored_version, r.entry); + { + auto id_to_deref = version_list->createUpsertEntry(restored_version, r.entry); + if (id_to_deref.low != INVALID_PAGE_ID) + { + auto deref_iter = dir->mvcc_table_directory.find(id_to_deref); + auto deref_res = deref_iter->second->derefAndClean(/*lowest_seq*/ 0, id_to_deref, restored_version, 1, nullptr); + RUNTIME_ASSERT(!deref_res); + } break; } + } } catch (DB::Exception & e) { diff --git a/dbms/src/Storages/Page/V3/tests/gtest_versioned_entries.cpp b/dbms/src/Storages/Page/V3/tests/gtest_versioned_entries.cpp index a4225f87550..36351f2e7ee 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_versioned_entries.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_versioned_entries.cpp @@ -45,7 +45,7 @@ namespace PS::V3::tests #define INSERT_ENTRY(VERSION) INSERT_BLOBID_ENTRY(1, VERSION) #define INSERT_GC_ENTRY(VERSION, EPOCH) \ PageEntryV3 entry_gc_v##VERSION##_##EPOCH{.file_id = 2, .size = 100 * (VERSION) + (EPOCH), .padded_size = 0, .tag = 0, .offset = 0x234, .checksum = 0x5678}; \ - entries.createNewEntry(PageVersion((VERSION), (EPOCH)), entry_gc_v##VERSION##_##EPOCH); + entries.createUpsertEntry(PageVersion((VERSION), (EPOCH)), entry_gc_v##VERSION##_##EPOCH); class VersionedEntriesTest : public ::testing::Test { From f78b8ae41bef383287aba8ecb084cffbb120e947 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Wed, 2 Nov 2022 23:18:08 +0800 Subject: [PATCH 07/22] Fix temp path --- dbms/src/Encryption/tests/gtest_rate_limiter.cpp | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/dbms/src/Encryption/tests/gtest_rate_limiter.cpp b/dbms/src/Encryption/tests/gtest_rate_limiter.cpp index 613e552def5..0d36e755323 100644 --- a/dbms/src/Encryption/tests/gtest_rate_limiter.cpp +++ b/dbms/src/Encryption/tests/gtest_rate_limiter.cpp @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -385,9 +386,11 @@ TEST(IORateLimiterTest, IOStat) ASSERT_EQ(io_rate_limiter.bg_read_limiter, nullptr); ASSERT_EQ(io_rate_limiter.fg_read_limiter, nullptr); - std::string fname = "/tmp/rate_limit_io_stat_test"; + String dir = TiFlashTestEnv::getTemporaryPath(); + TiFlashTestEnv::tryRemovePath(dir, /*recreate*/ true); + String fname = dir + "/rate_limit_io_stat_test"; int fd = ::open(fname.c_str(), O_CREAT | O_RDWR | O_DIRECT, 0666); - ASSERT_GT(fd, 0) << strerror(errno); + ASSERT_GT(fd, 0) << fmt::format("path:{}, err: {}", fname, strerror(errno)); std::unique_ptr> defer_close(&fd, [](const int * fd) { ::close(*fd); }); void * buf = nullptr; @@ -410,6 +413,9 @@ TEST(IORateLimiterTest, IOStat) TEST(IORateLimiterTest, IOStatMultiThread) { + String dir = TiFlashTestEnv::getTemporaryPath(); + TiFlashTestEnv::tryRemovePath(dir, /*recreate*/ true); + std::mutex bg_pids_mtx; std::vector bg_pids; auto add_bg_pid = [&](pid_t tid) { @@ -427,9 +433,9 @@ TEST(IORateLimiterTest, IOStatMultiThread) { add_bg_pid(syscall(SYS_gettid)); } - std::string fname = "/tmp/rate_limit_io_stat_test_" + std::to_string(id) + (is_bg ? "_bg" : "_fg"); + String fname = fmt::format("{}/rate_limit_io_stat_test_{}_{}", dir, id, (is_bg ? "bg" : "fg")); int fd = ::open(fname.c_str(), O_CREAT | O_RDWR | O_DIRECT, 0666); - ASSERT_GT(fd, 0) << strerror(errno); + ASSERT_GT(fd, 0) << fmt::format("path:{}, err: {}", fname, strerror(errno)); std::unique_ptr> defer_close(&fd, [](const int * fd) { ::close(*fd); }); void * buf = nullptr; From 28e552e46e24654d0e53d4e5bfaf274c4e2052df Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Wed, 2 Nov 2022 23:29:55 +0800 Subject: [PATCH 08/22] Fix ut for getEntriesByBlobId --- .../Page/V3/tests/gtest_versioned_entries.cpp | 93 ++++--------------- 1 file changed, 17 insertions(+), 76 deletions(-) diff --git a/dbms/src/Storages/Page/V3/tests/gtest_versioned_entries.cpp b/dbms/src/Storages/Page/V3/tests/gtest_versioned_entries.cpp index 36351f2e7ee..0d079e70bb5 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_versioned_entries.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_versioned_entries.cpp @@ -531,50 +531,10 @@ TEST_F(VersionedEntriesTest, getEntriesByBlobId) PageId page_id = 100; auto check_for_blob_id_1 = [&](const PageIdAndVersionedEntries & entries) { auto it = entries.begin(); - - ASSERT_EQ(std::get<0>(*it).low, page_id); - ASSERT_EQ(std::get<1>(*it).sequence, 1); - ASSERT_SAME_ENTRY(std::get<2>(*it), entry_v1); - - it++; - ASSERT_EQ(std::get<0>(*it).low, page_id); - ASSERT_EQ(std::get<1>(*it).sequence, 2); - ASSERT_SAME_ENTRY(std::get<2>(*it), entry_v2); - - it++; - ASSERT_EQ(std::get<0>(*it).low, page_id); - ASSERT_EQ(std::get<1>(*it).sequence, 5); - ASSERT_SAME_ENTRY(std::get<2>(*it), entry_v5); - - it++; ASSERT_EQ(std::get<0>(*it).low, page_id); ASSERT_EQ(std::get<1>(*it).sequence, 11); ASSERT_SAME_ENTRY(std::get<2>(*it), entry_v11); }; - auto check_for_blob_id_2 = [&](const PageIdAndVersionedEntries & entries) { - auto it = entries.begin(); - - ASSERT_EQ(std::get<0>(*it).low, page_id); - ASSERT_EQ(std::get<1>(*it).sequence, 3); - ASSERT_SAME_ENTRY(std::get<2>(*it), entry_v3); - - it++; - ASSERT_EQ(std::get<0>(*it).low, page_id); - ASSERT_EQ(std::get<1>(*it).sequence, 4); - ASSERT_SAME_ENTRY(std::get<2>(*it), entry_v4); - }; - auto check_for_blob_id_3 = [&](const PageIdAndVersionedEntries & entries) { - auto it = entries.begin(); - - ASSERT_EQ(std::get<0>(*it).low, page_id); - ASSERT_EQ(std::get<1>(*it).sequence, 6); - ASSERT_SAME_ENTRY(std::get<2>(*it), entry_v6); - - it++; - ASSERT_EQ(std::get<0>(*it).low, page_id); - ASSERT_EQ(std::get<1>(*it).sequence, 8); - ASSERT_SAME_ENTRY(std::get<2>(*it), entry_v8); - }; { std::map blob_entries; @@ -592,8 +552,8 @@ TEST_F(VersionedEntriesTest, getEntriesByBlobId) PageSize total_size = entries.getEntriesByBlobIds({blob_id}, buildV3Id(TEST_NAMESPACE_ID, page_id), blob_entries, rewrite); ASSERT_EQ(blob_entries.size(), 1); - ASSERT_EQ(blob_entries[blob_id].size(), 4); - ASSERT_EQ(total_size, 1 + 2 + 5 + 11); + ASSERT_EQ(blob_entries[blob_id].size(), 1); + ASSERT_EQ(total_size, 11); check_for_blob_id_1(blob_entries[blob_id]); } @@ -603,10 +563,8 @@ TEST_F(VersionedEntriesTest, getEntriesByBlobId) const BlobFileId blob_id = 2; PageSize total_size = entries.getEntriesByBlobIds({blob_id}, buildV3Id(TEST_NAMESPACE_ID, page_id), blob_entries, rewrite); - ASSERT_EQ(blob_entries.size(), 1); - ASSERT_EQ(blob_entries[blob_id].size(), 2); - ASSERT_EQ(total_size, 3 + 4); - check_for_blob_id_2(blob_entries[blob_id]); + ASSERT_EQ(blob_entries.empty(), true); + ASSERT_EQ(total_size, 0); } { @@ -615,10 +573,8 @@ TEST_F(VersionedEntriesTest, getEntriesByBlobId) const BlobFileId blob_id = 3; PageSize total_size = entries.getEntriesByBlobIds({blob_id}, buildV3Id(TEST_NAMESPACE_ID, page_id), blob_entries, rewrite); - ASSERT_EQ(blob_entries.size(), 1); - ASSERT_EQ(blob_entries[blob_id].size(), 2); - ASSERT_EQ(total_size, 6 + 8); - check_for_blob_id_3(blob_entries[blob_id]); + ASSERT_EQ(blob_entries.empty(), true); + ASSERT_EQ(total_size, 0); } // {1, 2} @@ -627,12 +583,10 @@ TEST_F(VersionedEntriesTest, getEntriesByBlobId) std::map> rewrite; PageSize total_size = entries.getEntriesByBlobIds({1, 2}, buildV3Id(TEST_NAMESPACE_ID, page_id), blob_entries, rewrite); - ASSERT_EQ(blob_entries.size(), 2); - ASSERT_EQ(blob_entries[1].size(), 4); - ASSERT_EQ(blob_entries[2].size(), 2); - ASSERT_EQ(total_size, (1 + 2 + 5 + 11) + (3 + 4)); + ASSERT_EQ(blob_entries.size(), 1); + ASSERT_EQ(blob_entries[1].size(), 1); + ASSERT_EQ(total_size, 11); check_for_blob_id_1(blob_entries[1]); - check_for_blob_id_2(blob_entries[2]); } // {2, 3} @@ -641,12 +595,8 @@ TEST_F(VersionedEntriesTest, getEntriesByBlobId) std::map> rewrite; PageSize total_size = entries.getEntriesByBlobIds({3, 2}, buildV3Id(TEST_NAMESPACE_ID, page_id), blob_entries, rewrite); - ASSERT_EQ(blob_entries.size(), 2); - ASSERT_EQ(blob_entries[2].size(), 2); - ASSERT_EQ(blob_entries[3].size(), 2); - ASSERT_EQ(total_size, (6 + 8) + (3 + 4)); - check_for_blob_id_2(blob_entries[2]); - check_for_blob_id_3(blob_entries[3]); + ASSERT_EQ(blob_entries.empty(), true); + ASSERT_EQ(total_size, 0); } // {1, 2, 3} @@ -655,14 +605,10 @@ TEST_F(VersionedEntriesTest, getEntriesByBlobId) std::map> rewrite; PageSize total_size = entries.getEntriesByBlobIds({1, 3, 2}, buildV3Id(TEST_NAMESPACE_ID, page_id), blob_entries, rewrite); - ASSERT_EQ(blob_entries.size(), 3); - ASSERT_EQ(blob_entries[1].size(), 4); - ASSERT_EQ(blob_entries[2].size(), 2); - ASSERT_EQ(blob_entries[3].size(), 2); - ASSERT_EQ(total_size, (1 + 2 + 5 + 11) + (6 + 8) + (3 + 4)); + ASSERT_EQ(blob_entries.size(), 1); + ASSERT_EQ(blob_entries[1].size(), 1); + ASSERT_EQ(total_size, 11); check_for_blob_id_1(blob_entries[1]); - check_for_blob_id_2(blob_entries[2]); - check_for_blob_id_3(blob_entries[3]); } // {1, 2, 3, 100}; blob_id 100 is not exist in actual @@ -671,15 +617,10 @@ TEST_F(VersionedEntriesTest, getEntriesByBlobId) std::map> rewrite; PageSize total_size = entries.getEntriesByBlobIds({1, 3, 2, 4}, buildV3Id(TEST_NAMESPACE_ID, page_id), blob_entries, rewrite); - ASSERT_EQ(blob_entries.size(), 3); // 100 not exist - ASSERT_EQ(blob_entries.find(100), blob_entries.end()); - ASSERT_EQ(blob_entries[1].size(), 4); - ASSERT_EQ(blob_entries[2].size(), 2); - ASSERT_EQ(blob_entries[3].size(), 2); - ASSERT_EQ(total_size, (1 + 2 + 5 + 11) + (6 + 8) + (3 + 4)); + ASSERT_EQ(blob_entries.size(), 1); + ASSERT_EQ(blob_entries[1].size(), 1); + ASSERT_EQ(total_size, 11); check_for_blob_id_1(blob_entries[1]); - check_for_blob_id_2(blob_entries[2]); - check_for_blob_id_3(blob_entries[3]); } } From 075114f8a81a5ea5fb66ab3a0a73ce31443c499b Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 3 Nov 2022 00:46:19 +0800 Subject: [PATCH 09/22] Update ut --- dbms/src/Storages/Page/V3/PageDirectory.cpp | 2 +- dbms/src/Storages/Page/V3/PageDirectory.h | 2 +- .../Storages/Page/V3/PageDirectoryFactory.cpp | 12 +- .../Storages/Page/V3/PageDirectoryFactory.h | 2 +- .../Page/V3/tests/gtest_page_directory.cpp | 146 ++++++++++++++---- .../Page/V3/tests/gtest_versioned_entries.cpp | 2 +- 6 files changed, 135 insertions(+), 31 deletions(-) diff --git a/dbms/src/Storages/Page/V3/PageDirectory.cpp b/dbms/src/Storages/Page/V3/PageDirectory.cpp index 4c85ed2643c..f36d3e28406 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectory.cpp @@ -561,7 +561,7 @@ PageSize VersionedPageEntries::getEntriesByBlobIds( auto page_lock = acquireLock(); if (type == EditRecordType::VAR_REF) { - if (is_deleted) + if (!is_deleted) { ref_ids_maybe_rewrite[page_id] = {ori_page_id, create_ver}; } diff --git a/dbms/src/Storages/Page/V3/PageDirectory.h b/dbms/src/Storages/Page/V3/PageDirectory.h index 610fccea2be..6fb09672347 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.h +++ b/dbms/src/Storages/Page/V3/PageDirectory.h @@ -162,7 +162,7 @@ class VersionedPageEntries void createNewEntry(const PageVersion & ver, const PageEntryV3 & entry); - PageIdV3Internal createUpsertEntry(const PageVersion & ver, const PageEntryV3 & entry); + [[nodiscard]] PageIdV3Internal createUpsertEntry(const PageVersion & ver, const PageEntryV3 & entry); bool createNewRef(const PageVersion & ver, PageIdV3Internal ori_page_id); diff --git a/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp b/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp index 84900eb332b..1e433674c8f 100644 --- a/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp @@ -77,14 +77,24 @@ PageDirectoryPtr PageDirectoryFactory::createFromReader(String storage_name, WAL return dir; } -PageDirectoryPtr PageDirectoryFactory::createFromEdit(String storage_name, FileProviderPtr & file_provider, PSDiskDelegatorPtr & delegator, const PageEntriesEdit & edit) +// just for test +PageDirectoryPtr PageDirectoryFactory::createFromEdit(String storage_name, FileProviderPtr & file_provider, PSDiskDelegatorPtr & delegator, PageEntriesEdit & edit) { auto [wal, reader] = WALStore::create(storage_name, file_provider, delegator, WALConfig()); (void)reader; PageDirectoryPtr dir = std::make_unique(std::move(storage_name), std::move(wal)); + + // Allocate mock sequence to run gc + UInt64 mock_sequence = 0; + for (auto & r : edit.getMutRecords()) + { + r.version.sequence = ++mock_sequence; + } + loadEdit(dir, edit); // Reset the `sequence` to the maximum of persisted. dir->sequence = max_applied_ver.sequence; + RUNTIME_CHECK(dir->sequence, mock_sequence); // After restoring from the disk, we need cleanup all invalid entries in memory, or it will // try to run GC again on some entries that are already marked as invalid in BlobStore. diff --git a/dbms/src/Storages/Page/V3/PageDirectoryFactory.h b/dbms/src/Storages/Page/V3/PageDirectoryFactory.h index 0252729ad20..2472ed11c71 100644 --- a/dbms/src/Storages/Page/V3/PageDirectoryFactory.h +++ b/dbms/src/Storages/Page/V3/PageDirectoryFactory.h @@ -52,7 +52,7 @@ class PageDirectoryFactory PageDirectoryPtr createFromReader(String storage_name, WALStoreReaderPtr reader, WALStorePtr wal); // just for test - PageDirectoryPtr createFromEdit(String storage_name, FileProviderPtr & file_provider, PSDiskDelegatorPtr & delegator, const PageEntriesEdit & edit); + PageDirectoryPtr createFromEdit(String storage_name, FileProviderPtr & file_provider, PSDiskDelegatorPtr & delegator, PageEntriesEdit & edit); // just for test PageDirectoryFactory & setBlobStats(BlobStats & blob_stats_) diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp index e5d0c299b45..82028655f19 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp @@ -1378,17 +1378,16 @@ try INSERT_ENTRY_TO(page_id, 5, 3); INSERT_ENTRY_TO(another_page_id, 6, 1); - // FIXME: This will copy many outdate pages // Full GC get entries auto candidate_entries_1 = dir->getEntriesByBlobIds({1}); EXPECT_EQ(candidate_entries_1.first.size(), 1); - EXPECT_EQ(candidate_entries_1.first[1].size(), 3); // 3 entries for 2 page id + EXPECT_EQ(candidate_entries_1.first[1].size(), 1); // 3 entries for 2 page id auto candidate_entries_2_3 = dir->getEntriesByBlobIds({2, 3}); - EXPECT_EQ(candidate_entries_2_3.first.size(), 2); + EXPECT_EQ(candidate_entries_2_3.first.size(), 1); const auto & entries_in_file2 = candidate_entries_2_3.first[2]; const auto & entries_in_file3 = candidate_entries_2_3.first[3]; - EXPECT_EQ(entries_in_file2.size(), 2); // 2 entries for 1 page id + EXPECT_EQ(entries_in_file2.empty(), true); EXPECT_EQ(entries_in_file3.size(), 1); // 1 entries for 1 page id PageEntriesEdit gc_migrate_entries; @@ -1411,6 +1410,10 @@ try // Full GC execute apply dir->gcApply(std::move(gc_migrate_entries)); + + auto snap = dir->createSnapshot(); + ASSERT_ENTRY_EQ(entry_v5, dir, page_id, snap); + ASSERT_ENTRY_EQ(entry_v6, dir, another_page_id, snap); } CATCH @@ -1429,20 +1432,16 @@ try EXPECT_EQ(dir->numPages(), 2); - // 1.1 Full GC get entries for blob_id in [1] + // A.1 Full GC get entries for blob_id in [1] auto candidate_entries_1 = dir->getEntriesByBlobIds({1}); EXPECT_EQ(candidate_entries_1.first.size(), 1); - EXPECT_EQ(candidate_entries_1.first[1].size(), 3); // 3 entries for 2 page id + EXPECT_EQ(candidate_entries_1.first[1].size(), 1); // 1 entries for `another_page_id` // for blob_id in [2, 3] auto candidate_entries_2_3 = dir->getEntriesByBlobIds({2, 3}); - EXPECT_EQ(candidate_entries_2_3.first.size(), 2); - const auto & entries_in_file2 = candidate_entries_2_3.first[2]; - const auto & entries_in_file3 = candidate_entries_2_3.first[3]; - EXPECT_EQ(entries_in_file2.size(), 2); // 2 entries for 1 page id - EXPECT_EQ(entries_in_file3.size(), 1); // 1 entry for 1 page_id + EXPECT_EQ(candidate_entries_2_3.first.empty(), true); - // 2.1 Execute GC + // B.1 Execute GC dir->gcInMemEntries(); // `page_id` get removed EXPECT_EQ(dir->numPages(), 1); @@ -1465,8 +1464,12 @@ try } } - // 1.2 Full GC execute apply - ASSERT_THROW({ dir->gcApply(std::move(gc_migrate_entries)); }, DB::Exception); + // A.2 Full GC execute apply, upsert `another_page_id`, but we still don't + // support Full GC and gcInMem run conncurrently + dir->gcApply(std::move(gc_migrate_entries)); + + auto snap = dir->createSnapshot(); + ASSERT_ENTRY_EQ(entry_v6, dir, another_page_id, snap); } CATCH @@ -1550,7 +1553,7 @@ try } CATCH -TEST_F(PageDirectoryGCTest, UpsertOnRefedEntries) +TEST_F(PageDirectoryGCTest, RewriteRefedId) try { // 10->entry1, 11->10, 12->10 @@ -1577,17 +1580,28 @@ try EXPECT_TRUE(outdated_entries.empty()); } - // upsert 10->entry2 PageEntryV3 entry2{.file_id = 2, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry3{.file_id = 2, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123 + 1024, .checksum = 0x4567}; { - PageEntriesEdit edit; + // this will return ref page 11 and 12 that need to be rewritten + // to new blob file. auto full_gc_entries = dir->getEntriesByBlobIds({1}); + ASSERT_EQ(full_gc_entries.first.size(), 1); auto ids = full_gc_entries.first.at(1); - ASSERT_EQ(ids.size(), 1); + ASSERT_EQ(ids.size(), 2); + ASSERT_EQ(std::get<0>(ids[0]), buildV3Id(TEST_NAMESPACE_ID, 11)); + ASSERT_EQ(std::get<0>(ids[1]), buildV3Id(TEST_NAMESPACE_ID, 12)); + + // upsert 11->entry2 + // upsert 12->entry3 + PageEntriesEdit edit; edit.upsertPage(std::get<0>(ids[0]), std::get<1>(ids[0]), entry2); + edit.upsertPage(std::get<0>(ids[1]), std::get<1>(ids[1]), entry3); + // this will rewrite ref page 11, 12 to normal page dir->gcApply(std::move(edit)); } + // page 10 get removed auto removed_entries = dir->gcInMemEntries(); ASSERT_EQ(removed_entries.size(), 1); EXPECT_SAME_ENTRY(removed_entries[0], entry1); @@ -1595,7 +1609,8 @@ try { auto snap = dir->createSnapshot(); EXPECT_ENTRY_EQ(entry2, dir, 11, snap); - EXPECT_ENTRY_EQ(entry2, dir, 12, snap); + EXPECT_ENTRY_EQ(entry3, dir, 12, snap); + EXPECT_ENTRY_NOT_EXIST(dir, 10, snap); } // del 11->entry2 @@ -1603,18 +1618,97 @@ try PageEntriesEdit edit; edit.del(buildV3Id(TEST_NAMESPACE_ID, 11)); dir->apply(std::move(edit)); - EXPECT_EQ(dir->gcInMemEntries().size(), 0); + // entry2 get removed + auto outdated_entries = dir->gcInMemEntries(); + ASSERT_EQ(1, outdated_entries.size()); + EXPECT_SAME_ENTRY(entry2, outdated_entries[0]); } - // del 12->entry2 + // del 12->entry3 { PageEntriesEdit edit; edit.del(buildV3Id(TEST_NAMESPACE_ID, 12)); dir->apply(std::move(edit)); - // entry2 get removed + // entry3 get removed auto outdated_entries = dir->gcInMemEntries(); - EXPECT_EQ(1, outdated_entries.size()); - EXPECT_SAME_ENTRY(entry2, *outdated_entries.begin()); + ASSERT_EQ(1, outdated_entries.size()); + EXPECT_SAME_ENTRY(entry3, outdated_entries[0]); + } + + ASSERT_EQ(dir->getAllPageIds().empty(), true); +} +CATCH + +TEST_F(PageDirectoryGCTest, RewriteRefedIdWithConcurrentDelete) +try +{ + // 10->entry1, 11->10, 12->10 + PageEntryV3 entry1{.file_id = 1, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + { + PageEntriesEdit edit; + edit.put(buildV3Id(TEST_NAMESPACE_ID, 10), entry1); + dir->apply(std::move(edit)); + } + { + PageEntriesEdit edit; + edit.ref(buildV3Id(TEST_NAMESPACE_ID, 11), buildV3Id(TEST_NAMESPACE_ID, 10)); + dir->apply(std::move(edit)); + } + { + PageEntriesEdit edit; + edit.ref(buildV3Id(TEST_NAMESPACE_ID, 12), buildV3Id(TEST_NAMESPACE_ID, 10)); + edit.del(buildV3Id(TEST_NAMESPACE_ID, 10)); + dir->apply(std::move(edit)); + } + // entry1 should not be removed + { + auto outdated_entries = dir->gcInMemEntries(); + EXPECT_TRUE(outdated_entries.empty()); + } + + PageEntryV3 entry2{.file_id = 2, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry3{.file_id = 2, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123 + 1024, .checksum = 0x4567}; + { + // this will return ref page 11 and 12 that need to be rewritten + // to new blob file. + auto full_gc_entries = dir->getEntriesByBlobIds({1}); + ASSERT_EQ(full_gc_entries.first.size(), 1); + auto ids = full_gc_entries.first.at(1); + ASSERT_EQ(ids.size(), 2); + ASSERT_EQ(std::get<0>(ids[0]), buildV3Id(TEST_NAMESPACE_ID, 11)); + ASSERT_EQ(std::get<0>(ids[1]), buildV3Id(TEST_NAMESPACE_ID, 12)); + + // unlike `RewriteRefedId`, foreground delete 11, 12 before + // full gc apply upserts + PageEntriesEdit fore_edit; + fore_edit.del(buildV3Id(TEST_NAMESPACE_ID, 11)); + fore_edit.del(buildV3Id(TEST_NAMESPACE_ID, 12)); + dir->apply(std::move(fore_edit)); + + // full gc ends, apply upserts + // upsert 11->entry2 + // upsert 12->entry3 + PageEntriesEdit edit; + edit.upsertPage(std::get<0>(ids[0]), std::get<1>(ids[0]), entry2); + edit.upsertPage(std::get<0>(ids[1]), std::get<1>(ids[1]), entry3); + // this will rewrite ref page 11, 12 to normal page + dir->gcApply(std::move(edit)); + } + + // page 10,11,12 get removed + auto removed_entries = dir->gcInMemEntries(); + ASSERT_EQ(removed_entries.size(), 3); + EXPECT_SAME_ENTRY(removed_entries[0], entry1); + EXPECT_SAME_ENTRY(removed_entries[1], entry2); + EXPECT_SAME_ENTRY(removed_entries[2], entry3); + + { + auto snap = dir->createSnapshot(); + EXPECT_ENTRY_NOT_EXIST(dir, 11, snap); + EXPECT_ENTRY_NOT_EXIST(dir, 12, snap); + EXPECT_ENTRY_NOT_EXIST(dir, 10, snap); } + + ASSERT_EQ(dir->getAllPageIds().empty(), true); } CATCH @@ -1953,7 +2047,7 @@ try dir->apply(std::move(edit)); } - auto restore_from_edit = [](const PageEntriesEdit & edit, BlobStats & stats) { + auto restore_from_edit = [](PageEntriesEdit & edit, BlobStats & stats) { auto ctx = ::DB::tests::TiFlashTestEnv::getContext(); auto provider = ctx.getFileProvider(); auto path = getTemporaryPath(); @@ -1997,7 +2091,7 @@ try PageEntryV3 entry_50_1{.file_id = 1, .size = 7890, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; PageEntryV3 entry_50_2{.file_id = 2, .size = 7890, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - auto restore_from_edit = [](const PageEntriesEdit & edit) { + auto restore_from_edit = [](PageEntriesEdit & edit) { auto ctx = ::DB::tests::TiFlashTestEnv::getContext(); auto provider = ctx.getFileProvider(); auto path = getTemporaryPath(); diff --git a/dbms/src/Storages/Page/V3/tests/gtest_versioned_entries.cpp b/dbms/src/Storages/Page/V3/tests/gtest_versioned_entries.cpp index 0d079e70bb5..4424c29608b 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_versioned_entries.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_versioned_entries.cpp @@ -45,7 +45,7 @@ namespace PS::V3::tests #define INSERT_ENTRY(VERSION) INSERT_BLOBID_ENTRY(1, VERSION) #define INSERT_GC_ENTRY(VERSION, EPOCH) \ PageEntryV3 entry_gc_v##VERSION##_##EPOCH{.file_id = 2, .size = 100 * (VERSION) + (EPOCH), .padded_size = 0, .tag = 0, .offset = 0x234, .checksum = 0x5678}; \ - entries.createUpsertEntry(PageVersion((VERSION), (EPOCH)), entry_gc_v##VERSION##_##EPOCH); + (void)entries.createUpsertEntry(PageVersion((VERSION), (EPOCH)), entry_gc_v##VERSION##_##EPOCH); class VersionedEntriesTest : public ::testing::Test { From 01b7b2a8cf2069b3a9b1838fec8c1e1e2cc4ccf4 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 3 Nov 2022 01:17:20 +0800 Subject: [PATCH 10/22] Add writebatch check --- dbms/src/Storages/Page/V3/BlobStore.cpp | 52 +++++++++++++++++++ .../Page/V3/tests/gtest_page_storage.cpp | 31 ++++++----- 2 files changed, 69 insertions(+), 14 deletions(-) diff --git a/dbms/src/Storages/Page/V3/BlobStore.cpp b/dbms/src/Storages/Page/V3/BlobStore.cpp index 9859696bd65..8ec19c10486 100644 --- a/dbms/src/Storages/Page/V3/BlobStore.cpp +++ b/dbms/src/Storages/Page/V3/BlobStore.cpp @@ -28,6 +28,7 @@ #include #include #include +#include #include #include @@ -35,6 +36,7 @@ #include #include #include +#include namespace ProfileEvents { @@ -238,12 +240,62 @@ PageEntriesEdit BlobStore::handleLargeWrite(DB::WriteBatch & wb, const WriteLimi return edit; } +void checkWriteBatch(const DB::WriteBatch & wb) +{ + enum class IdState + { + PUT = 1, + PUT_REF = 2, + PUT_REF_DEL = 3, + }; + std::unordered_map states; + for (const auto & w : wb.getWrites()) + { + switch (w.type) + { + case DB::WriteBatchWriteType::PUT: + case DB::WriteBatchWriteType::PUT_EXTERNAL: + { + if (states.find(w.page_id) == states.end()) + states[w.page_id] = IdState::PUT; + break; + } + case DB::WriteBatchWriteType::REF: + { + if (auto iter = states.find(w.ori_page_id); + iter != states.end() && iter->second == IdState::PUT) + { + iter->second = IdState::PUT_REF; + } + break; + } + case DB::WriteBatchWriteType::DEL: + { + if (auto iter = states.find(w.page_id); iter != states.end() && iter->second == IdState::PUT_REF) + { + throw Exception(fmt::format( + "Invalid write batch, put-ref-del inside one write batch! page_id={}.{}", + wb.getNamespaceId(), + iter->first), + ErrorCodes::LOGICAL_ERROR); + } + break; + } + + case DB::WriteBatchWriteType::UPSERT: + throw Exception("only check foreground writes"); + } + } +} + PageEntriesEdit BlobStore::write(DB::WriteBatch & wb, const WriteLimiterPtr & write_limiter) { ProfileEvents::increment(ProfileEvents::PSMWritePages, wb.putWriteCount()); const size_t all_page_data_size = wb.getTotalDataSize(); + checkWriteBatch(wb); + PageEntriesEdit edit; if (all_page_data_size == 0) diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp index 7d1595a579f..2f455117573 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp @@ -30,6 +30,7 @@ #include #include #include +#include #include #include #include @@ -437,40 +438,42 @@ try } CATCH +TEST_F(PageStorageTest, UnsupportWriteBatch) +{ + // This will make put && delete share the same + // version. If full gc happen, then the upsert + // entry can not be insert between put && delete. + WriteBatch wb; + wb.putPage(1, default_tag, getDefaultBuffer(), buf_sz); + wb.putRefPage(2, 1); + wb.delPage(1); + ASSERT_THROW(page_storage->write(std::move(wb)), DB::Exception); +} + TEST_F(PageStorageTest, WriteMultipleBatchRead1) try { - const UInt64 tag = 0; - const size_t buf_sz = 1024; - char c_buff[buf_sz]; - for (size_t i = 0; i < buf_sz; ++i) - { - c_buff[i] = i % 0xff; - } - { WriteBatch batch; - ReadBufferPtr buff = std::make_shared(c_buff, sizeof(c_buff)); - batch.putPage(0, tag, buff, buf_sz); + batch.putPage(0, default_tag, getDefaultBuffer(), buf_sz); page_storage->write(std::move(batch)); } { WriteBatch batch; - ReadBufferPtr buff = std::make_shared(c_buff, sizeof(c_buff)); - batch.putPage(1, tag, buff, buf_sz); + batch.putPage(1, default_tag, getDefaultBuffer(), buf_sz); page_storage->write(std::move(batch)); } DB::Page page0 = page_storage->read(0); ASSERT_EQ(page0.data.size(), buf_sz); - ASSERT_EQ(page0.page_id, 0UL); + ASSERT_EQ(page0.page_id, 0); for (size_t i = 0; i < buf_sz; ++i) { EXPECT_EQ(*(page0.data.begin() + i), static_cast(i % 0xff)); } DB::Page page1 = page_storage->read(1); ASSERT_EQ(page1.data.size(), buf_sz); - ASSERT_EQ(page1.page_id, 1UL); + ASSERT_EQ(page1.page_id, 1); for (size_t i = 0; i < buf_sz; ++i) { EXPECT_EQ(*(page1.data.begin() + i), static_cast(i % 0xff)); From 746cf5f574e056e50b95f138cccb5f760f4ad32c Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 3 Nov 2022 02:17:45 +0800 Subject: [PATCH 11/22] Fix ut after check WriteBatch --- dbms/src/Storages/Page/V3/BlobStore.cpp | 3 ++- .../Page/V3/tests/gtest_blob_store.cpp | 18 +----------------- .../Page/V3/tests/gtest_page_storage.cpp | 17 ++++++----------- .../Page/V3/tests/gtest_page_storage_gc.cpp | 4 ++++ 4 files changed, 13 insertions(+), 29 deletions(-) diff --git a/dbms/src/Storages/Page/V3/BlobStore.cpp b/dbms/src/Storages/Page/V3/BlobStore.cpp index 8ec19c10486..de2e9696ff1 100644 --- a/dbms/src/Storages/Page/V3/BlobStore.cpp +++ b/dbms/src/Storages/Page/V3/BlobStore.cpp @@ -253,8 +253,9 @@ void checkWriteBatch(const DB::WriteBatch & wb) { switch (w.type) { - case DB::WriteBatchWriteType::PUT: case DB::WriteBatchWriteType::PUT_EXTERNAL: + break; + case DB::WriteBatchWriteType::PUT: { if (states.find(w.page_id) == states.end()) states[w.page_id] = IdState::PUT; diff --git a/dbms/src/Storages/Page/V3/tests/gtest_blob_store.cpp b/dbms/src/Storages/Page/V3/tests/gtest_blob_store.cpp index 255a598c834..1e6b77432ad 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_blob_store.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_blob_store.cpp @@ -710,15 +710,6 @@ try wb.clear(); { - char c_buff[buff_size]; - - for (size_t i = 0; i < buff_size; ++i) - { - c_buff[i] = i & 0xff; - } - - ReadBufferPtr buff = std::make_shared(c_buff, buff_size); - wb.putPage(page_id, /*tag*/ 0, buff, buff_size); wb.putRefPage(page_id + 1, page_id); wb.delPage(page_id); @@ -726,18 +717,11 @@ try auto records = edit.getRecords(); auto record = records[0]; - ASSERT_EQ(record.type, EditRecordType::PUT); - ASSERT_EQ(record.page_id.low, page_id); - ASSERT_EQ(record.entry.offset, buff_size * 2); - ASSERT_EQ(record.entry.size, buff_size); - ASSERT_EQ(record.entry.file_id, 1); - - record = records[1]; ASSERT_EQ(record.type, EditRecordType::REF); ASSERT_EQ(record.page_id.low, page_id + 1); ASSERT_EQ(record.ori_page_id.low, page_id); - record = records[2]; + record = records[1]; ASSERT_EQ(record.type, EditRecordType::DEL); ASSERT_EQ(record.page_id.low, page_id); } diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp index 2f455117573..ec4494569a1 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp @@ -1353,20 +1353,16 @@ CATCH TEST_F(PageStorageTest, readRefAfterRestore) try { - const size_t buf_sz = 1024; - char c_buff[buf_sz]; - - for (size_t i = 0; i < buf_sz; ++i) { - c_buff[i] = i % 0xff; + WriteBatch batch; + batch.putPage(1, 0, getDefaultBuffer(), buf_sz, PageFieldSizes{{32, 64, 79, 128, 196, 256, 269}}); + batch.putRefPage(3, 1); + page_storage->write(std::move(batch)); } - { WriteBatch batch; - batch.putPage(1, 0, std::make_shared(c_buff, buf_sz), buf_sz, PageFieldSizes{{32, 64, 79, 128, 196, 256, 269}}); - batch.putRefPage(3, 1); batch.delPage(1); - batch.putPage(4, 0, std::make_shared(c_buff, buf_sz), buf_sz, {}); + batch.putPage(4, 0, getDefaultBuffer(), buf_sz, {}); page_storage->write(std::move(batch)); } @@ -1374,8 +1370,7 @@ try { WriteBatch batch; - memset(c_buff, 0, buf_sz); - batch.putPage(5, 0, std::make_shared(c_buff, buf_sz), buf_sz, {}); + batch.putPage(5, 0, getDefaultBuffer(), buf_sz, {}); page_storage->write(std::move(batch)); } diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_storage_gc.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_storage_gc.cpp index 7a3cfda3f44..b1252b7bc03 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_storage_gc.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_storage_gc.cpp @@ -190,6 +190,10 @@ try { WriteBatch batch; batch.putPage(page_id1, default_tag, getDefaultBuffer(), buf_sz); + page_storage->write(std::move(batch)); + } + { + WriteBatch batch; batch.putRefPage(ref_page_id, page_id1); batch.delPage(page_id1); page_storage->write(std::move(batch)); From 77dd22655ac4ae39c6cf8b17abd9c4e1c76e3888 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 3 Nov 2022 03:22:11 +0800 Subject: [PATCH 12/22] Refine ut --- dbms/src/Storages/Page/V3/PageDirectory.cpp | 5 +- dbms/src/Storages/Page/V3/PageStorageImpl.cpp | 6 +- .../Page/V3/tests/gtest_page_storage_gc.cpp | 130 ++++++++++++++---- 3 files changed, 115 insertions(+), 26 deletions(-) diff --git a/dbms/src/Storages/Page/V3/PageDirectory.cpp b/dbms/src/Storages/Page/V3/PageDirectory.cpp index f36d3e28406..00c13e6c820 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectory.cpp @@ -1387,6 +1387,7 @@ PageDirectory::getEntriesByBlobIds(const std::vector & blob_ids) con } } + size_t num_ref_id_rewrite = 0; for (const auto & [ref_id, ori_id_ver] : ref_ids_maybe_rewrite) { const auto ori_id = std::get<0>(ori_id_ver); @@ -1408,10 +1409,12 @@ PageDirectory::getEntriesByBlobIds(const std::vector & blob_ids) con blob_versioned_entries[entry->file_id].emplace_back(ref_id, ver, *entry); total_page_size += entry->size; total_page_nums += 1; + num_ref_id_rewrite += 1; } } - LOG_INFO(log, "Get entries by blob ids done. [total_page_size={}] [total_page_nums={}]", // + LOG_INFO(log, "Get entries by blob ids done [rewrite_ref_pages={}] [total_page_size={}] [total_page_nums={}]", // + num_ref_id_rewrite, total_page_size, // total_page_nums); return std::make_pair(std::move(blob_versioned_entries), total_page_size); diff --git a/dbms/src/Storages/Page/V3/PageStorageImpl.cpp b/dbms/src/Storages/Page/V3/PageStorageImpl.cpp index a6c6e40864d..38664aae5f0 100644 --- a/dbms/src/Storages/Page/V3/PageStorageImpl.cpp +++ b/dbms/src/Storages/Page/V3/PageStorageImpl.cpp @@ -427,7 +427,7 @@ PageStorageImpl::GCTimeStatistics PageStorageImpl::doGC(const WriteLimiterPtr & const auto & del_entries = page_directory->gcInMemEntries(); statistics.gc_in_mem_entries_ms = gc_watch.elapsedMillisecondsFromLastTime(); - SYNC_FOR("before_PageStorageImpl::doGC_fullGC"); + SYNC_FOR("before_PageStorageImpl::doGC_fullGC_prepare"); // 2. Remove the expired entries in BlobStore. // It won't delete the data on the disk. @@ -462,6 +462,8 @@ PageStorageImpl::GCTimeStatistics PageStorageImpl::doGC(const WriteLimiterPtr & return statistics; } + SYNC_FOR("before_PageStorageImpl::doGC_fullGC_commit"); + // 5. Do the BlobStore GC // After BlobStore GC, these entries will be migrated to a new blob. // Then we should notify MVCC apply the change. @@ -479,6 +481,8 @@ PageStorageImpl::GCTimeStatistics PageStorageImpl::doGC(const WriteLimiterPtr & page_directory->gcApply(std::move(gc_edit), write_limiter); statistics.full_gc_apply_ms = gc_watch.elapsedMillisecondsFromLastTime(); + SYNC_FOR("after_PageStorageImpl::doGC_fullGC_commit"); + cleanExternalPage(gc_watch, statistics); statistics.stage = GCStageType::FullGC; statistics.total_cost_ms = gc_watch.elapsedMilliseconds(); diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_storage_gc.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_storage_gc.cpp index b1252b7bc03..93cf7785cb0 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_storage_gc.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_storage_gc.cpp @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -24,6 +25,7 @@ #include #include +#include namespace DB { @@ -34,24 +36,24 @@ extern const char force_ps_wal_compact[]; namespace PS::V3::tests { -struct TestParam +struct FullGCParam { bool keep_snap; bool has_ref; - explicit TestParam(const std::tuple & t) + explicit FullGCParam(const std::tuple & t) : keep_snap(std::get<0>(t)) , has_ref(std::get<1>(t)) { } }; -class PageStorageFullGCTestWithParam +class PageStorageFullGCTest : public PageStorageTest , public testing::WithParamInterface> { public: - PageStorageFullGCTestWithParam() + PageStorageFullGCTest() : test_param(GetParam()) { } @@ -62,10 +64,10 @@ class PageStorageFullGCTestWithParam } protected: - TestParam test_param; + FullGCParam test_param; }; -TEST_P(PageStorageFullGCTestWithParam, DontMoveDeletedPageId) +TEST_P(PageStorageFullGCTest, DontMoveDeletedPageId) try { // always pick all blob file for full gc @@ -111,13 +113,70 @@ CATCH INSTANTIATE_TEST_CASE_P( Group, - PageStorageFullGCTestWithParam, + PageStorageFullGCTest, ::testing::Combine( ::testing::Bool(), ::testing::Bool())); +/////// +/// PageStorageFullGCConcurrentTest +/////// -TEST_F(PageStorageTest, DeletePageIdAfterWALCompact) +enum class DeleteTiming +{ + BeforeFullGCPrepare = 1, + BeforeFullGCCommit, + AfterFullGCCommit, +}; +class PageStorageFullGCConcurrentTest + : public PageStorageTest + , public testing::WithParamInterface +{ +public: + PageStorageFullGCConcurrentTest() + : timing(GetParam()) + { + } + + void SetUp() override + { + PageStorageTest::SetUp(); + } + + SyncPointScopeGuard getSyncPoint() const + { + switch (timing) + { + case DeleteTiming::BeforeFullGCPrepare: + return SyncPointCtl::enableInScope("before_PageStorageImpl::doGC_fullGC_prepare"); + case DeleteTiming::BeforeFullGCCommit: + return SyncPointCtl::enableInScope("before_PageStorageImpl::doGC_fullGC_commit"); + case DeleteTiming::AfterFullGCCommit: + return SyncPointCtl::enableInScope("after_PageStorageImpl::doGC_fullGC_commit"); + } + } + + bool expectFullGCExecute() const + { + // - If delete happen before prepare, then nothing need to be rewrite. + // - If delete happen before commit, the page is logically delete. + // We should able to handle the "upsert after delete" on disk. + switch (timing) + { + case DeleteTiming::BeforeFullGCPrepare: + return false; + case DeleteTiming::BeforeFullGCCommit: + return true; + case DeleteTiming::AfterFullGCCommit: + return true; + } + } + +protected: + DeleteTiming timing; +}; + +TEST_P(PageStorageFullGCConcurrentTest, DeletePage) try { // always pick all blob file for full gc @@ -133,23 +192,22 @@ try } FailPointHelper::enableFailPoint(FailPoints::force_ps_wal_compact); - auto sp_gc = SyncPointCtl::enableInScope("before_PageStorageImpl::doGC_fullGC"); + auto sp_gc = getSyncPoint(); auto th_gc = std::async([&]() { auto done_full_gc = page_storage->gcImpl(/* not_skip */ true, nullptr, nullptr); - ASSERT_FALSE(done_full_gc); + ASSERT_EQ(expectFullGCExecute(), done_full_gc); }); // let's compact the WAL logs sp_gc.waitAndPause(); - // If page is logically delete between `tryDumpSnapshot` and `fullGC`, then - // we should able to handle the "upsert after delete" on disk { + // the delete timing is decide by `sp_gc` WriteBatch batch; batch.delPage(page_id1); page_storage->write(std::move(batch)); } - // let's try full gc, this will not trigger full gc + // let's try full gc sp_gc.next(); th_gc.get(); @@ -177,7 +235,7 @@ try } CATCH -TEST_F(PageStorageTest, DeleteRefPageIdAfterWALCompact) +TEST_P(PageStorageFullGCConcurrentTest, DeleteRefPage) try { // always pick all blob file for full gc @@ -186,7 +244,9 @@ try page_storage->reloadSettings(new_config); PageId page_id1 = 101; - PageId ref_page_id = 102; + PageId ref_page_id2 = 102; + PageId ref_page_id3 = 103; + PageId ref_page_id4 = 104; { WriteBatch batch; batch.putPage(page_id1, default_tag, getDefaultBuffer(), buf_sz); @@ -194,34 +254,44 @@ try } { WriteBatch batch; - batch.putRefPage(ref_page_id, page_id1); + batch.putRefPage(ref_page_id2, page_id1); batch.delPage(page_id1); page_storage->write(std::move(batch)); } + { + WriteBatch batch; + batch.putRefPage(ref_page_id3, ref_page_id2); + page_storage->write(std::move(batch)); + } + { + WriteBatch batch; + batch.putRefPage(ref_page_id4, ref_page_id3); + page_storage->write(std::move(batch)); + } FailPointHelper::enableFailPoint(FailPoints::force_ps_wal_compact); - auto sp_gc = SyncPointCtl::enableInScope("before_PageStorageImpl::doGC_fullGC"); + auto sp_gc = getSyncPoint(); auto th_gc = std::async([&]() { auto done_full_gc = page_storage->gcImpl(/* not_skip */ true, nullptr, nullptr); - ASSERT_TRUE(done_full_gc); // note that full gc is executed + ASSERT_EQ(expectFullGCExecute(), done_full_gc); }); // let's compact the WAL logs sp_gc.waitAndPause(); - // If page is logically delete between `tryDumpSnapshot` and `fullGC`, then - // we should able to handle the "upsert after delete" on disk { + // the delete timing is decide by `sp_gc` WriteBatch batch; - batch.delPage(ref_page_id); + batch.delPage(ref_page_id2); + batch.delPage(ref_page_id3); + batch.delPage(ref_page_id4); page_storage->write(std::move(batch)); } - // let's try full gc, unlike `DeletePageIdAfterWALCompact`, - // this ** will ** trigger full gc + // let's try full gc sp_gc.next(); th_gc.get(); - // wal compact again !!!! FIXME: this will throw exception + // wal compact again page_storage->page_directory->tryDumpSnapshot(nullptr, nullptr, true); LOG_INFO(log, "close and restore WAL from disk"); @@ -245,5 +315,17 @@ try } CATCH +INSTANTIATE_TEST_CASE_P( + DeleteTiming, + PageStorageFullGCConcurrentTest, + ::testing::Values( + DeleteTiming::BeforeFullGCPrepare, + DeleteTiming::BeforeFullGCCommit, + DeleteTiming::AfterFullGCCommit // + ), + [](const ::testing::TestParamInfo & param) { + return String(magic_enum::enum_name(param.param)); + }); + } // namespace PS::V3::tests } // namespace DB From d77e3fb8dcbb546dbbdd5012a8653b066af61b07 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 3 Nov 2022 10:33:25 +0800 Subject: [PATCH 13/22] Incr the sequence by len of EditRecords --- dbms/src/Storages/Page/V3/BlobStore.cpp | 51 ------------------- dbms/src/Storages/Page/V3/PageDirectory.cpp | 22 ++++---- .../Page/V3/tests/gtest_blob_store.cpp | 18 ++++++- .../Page/V3/tests/gtest_page_storage.cpp | 39 +++++++------- .../Page/V3/tests/gtest_page_storage_gc.cpp | 4 -- 5 files changed, 51 insertions(+), 83 deletions(-) diff --git a/dbms/src/Storages/Page/V3/BlobStore.cpp b/dbms/src/Storages/Page/V3/BlobStore.cpp index de2e9696ff1..6dee4f0b088 100644 --- a/dbms/src/Storages/Page/V3/BlobStore.cpp +++ b/dbms/src/Storages/Page/V3/BlobStore.cpp @@ -240,63 +240,12 @@ PageEntriesEdit BlobStore::handleLargeWrite(DB::WriteBatch & wb, const WriteLimi return edit; } -void checkWriteBatch(const DB::WriteBatch & wb) -{ - enum class IdState - { - PUT = 1, - PUT_REF = 2, - PUT_REF_DEL = 3, - }; - std::unordered_map states; - for (const auto & w : wb.getWrites()) - { - switch (w.type) - { - case DB::WriteBatchWriteType::PUT_EXTERNAL: - break; - case DB::WriteBatchWriteType::PUT: - { - if (states.find(w.page_id) == states.end()) - states[w.page_id] = IdState::PUT; - break; - } - case DB::WriteBatchWriteType::REF: - { - if (auto iter = states.find(w.ori_page_id); - iter != states.end() && iter->second == IdState::PUT) - { - iter->second = IdState::PUT_REF; - } - break; - } - case DB::WriteBatchWriteType::DEL: - { - if (auto iter = states.find(w.page_id); iter != states.end() && iter->second == IdState::PUT_REF) - { - throw Exception(fmt::format( - "Invalid write batch, put-ref-del inside one write batch! page_id={}.{}", - wb.getNamespaceId(), - iter->first), - ErrorCodes::LOGICAL_ERROR); - } - break; - } - - case DB::WriteBatchWriteType::UPSERT: - throw Exception("only check foreground writes"); - } - } -} - PageEntriesEdit BlobStore::write(DB::WriteBatch & wb, const WriteLimiterPtr & write_limiter) { ProfileEvents::increment(ProfileEvents::PSMWritePages, wb.putWriteCount()); const size_t all_page_data_size = wb.getTotalDataSize(); - checkWriteBatch(wb); - PageEntriesEdit edit; if (all_page_data_size == 0) diff --git a/dbms/src/Storages/Page/V3/PageDirectory.cpp b/dbms/src/Storages/Page/V3/PageDirectory.cpp index 00c13e6c820..2a9cd9c9590 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectory.cpp @@ -1237,13 +1237,15 @@ void PageDirectory::apply(PageEntriesEdit && edit, const WriteLimiterPtr & write // also needs to be protected by `write_lock` throughout the `apply` // TODO: It is totally serialized, make it a pipeline std::unique_lock write_lock(table_rw_mutex); - UInt64 last_sequence = sequence.load(); - PageVersion new_version(last_sequence + 1, 0); + const UInt64 last_sequence = sequence.load(); + // new sequence allocated in this edit + UInt64 new_sequence = last_sequence + 1; - // stage 1, persisted the changes to WAL with version [seq=last_seq + 1, epoch=0] + // stage 1, persisted the changes to WAL with new versions for (auto & r : edit.getMutRecords()) { - r.version = new_version; + r.version = PageVersion(new_sequence, 0); + new_sequence += 1; } wal->apply(ser::serializeTo(edit), write_limiter); @@ -1266,7 +1268,7 @@ void PageDirectory::apply(PageEntriesEdit && edit, const WriteLimiterPtr & write { case EditRecordType::PUT_EXTERNAL: { - auto holder = version_list->createNewExternal(new_version); + auto holder = version_list->createNewExternal(r.version); if (holder) { // put the new created holder into `external_ids` @@ -1276,13 +1278,13 @@ void PageDirectory::apply(PageEntriesEdit && edit, const WriteLimiterPtr & write break; } case EditRecordType::PUT: - version_list->createNewEntry(new_version, r.entry); + version_list->createNewEntry(r.version, r.entry); break; case EditRecordType::DEL: - version_list->createDelete(new_version); + version_list->createDelete(r.version); break; case EditRecordType::REF: - applyRefEditRecord(mvcc_table_directory, version_list, r, new_version); + applyRefEditRecord(mvcc_table_directory, version_list, r, r.version); break; case EditRecordType::UPSERT: case EditRecordType::VAR_DELETE: @@ -1294,13 +1296,13 @@ void PageDirectory::apply(PageEntriesEdit && edit, const WriteLimiterPtr & write } catch (DB::Exception & e) { - e.addMessage(fmt::format(" [type={}] [page_id={}] [ver={}] [edit_size={}]", magic_enum::enum_name(r.type), r.page_id, new_version, edit.size())); + e.addMessage(fmt::format(" [type={}] [page_id={}] [ver={}] [edit_size={}]", magic_enum::enum_name(r.type), r.page_id, r.version, edit.size())); e.rethrow(); } } // stage 3, the edit committed, incr the sequence number to publish changes for `createSnapshot` - sequence.fetch_add(1); + sequence.fetch_add(new_sequence - last_sequence); } void PageDirectory::gcApply(PageEntriesEdit && migrated_edit, const WriteLimiterPtr & write_limiter) diff --git a/dbms/src/Storages/Page/V3/tests/gtest_blob_store.cpp b/dbms/src/Storages/Page/V3/tests/gtest_blob_store.cpp index 1e6b77432ad..255a598c834 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_blob_store.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_blob_store.cpp @@ -710,6 +710,15 @@ try wb.clear(); { + char c_buff[buff_size]; + + for (size_t i = 0; i < buff_size; ++i) + { + c_buff[i] = i & 0xff; + } + + ReadBufferPtr buff = std::make_shared(c_buff, buff_size); + wb.putPage(page_id, /*tag*/ 0, buff, buff_size); wb.putRefPage(page_id + 1, page_id); wb.delPage(page_id); @@ -717,11 +726,18 @@ try auto records = edit.getRecords(); auto record = records[0]; + ASSERT_EQ(record.type, EditRecordType::PUT); + ASSERT_EQ(record.page_id.low, page_id); + ASSERT_EQ(record.entry.offset, buff_size * 2); + ASSERT_EQ(record.entry.size, buff_size); + ASSERT_EQ(record.entry.file_id, 1); + + record = records[1]; ASSERT_EQ(record.type, EditRecordType::REF); ASSERT_EQ(record.page_id.low, page_id + 1); ASSERT_EQ(record.ori_page_id.low, page_id); - record = records[1]; + record = records[2]; ASSERT_EQ(record.type, EditRecordType::DEL); ASSERT_EQ(record.page_id.low, page_id); } diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp index ec4494569a1..e0f661ad2be 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp @@ -438,17 +438,17 @@ try } CATCH -TEST_F(PageStorageTest, UnsupportWriteBatch) -{ - // This will make put && delete share the same - // version. If full gc happen, then the upsert - // entry can not be insert between put && delete. - WriteBatch wb; - wb.putPage(1, default_tag, getDefaultBuffer(), buf_sz); - wb.putRefPage(2, 1); - wb.delPage(1); - ASSERT_THROW(page_storage->write(std::move(wb)), DB::Exception); -} +// TEST_F(PageStorageTest, UnsupportWriteBatch) +// { +// // This will make put && delete share the same +// // version. If full gc happen, then the upsert +// // entry can not be insert between put && delete. +// WriteBatch wb; +// wb.putPage(1, default_tag, getDefaultBuffer(), buf_sz); +// wb.putRefPage(2, 1); +// wb.delPage(1); +// ASSERT_THROW(page_storage->write(std::move(wb)), DB::Exception); +// } TEST_F(PageStorageTest, WriteMultipleBatchRead1) try @@ -1353,16 +1353,20 @@ CATCH TEST_F(PageStorageTest, readRefAfterRestore) try { + const size_t buf_sz = 1024; + char c_buff[buf_sz]; + + for (size_t i = 0; i < buf_sz; ++i) { - WriteBatch batch; - batch.putPage(1, 0, getDefaultBuffer(), buf_sz, PageFieldSizes{{32, 64, 79, 128, 196, 256, 269}}); - batch.putRefPage(3, 1); - page_storage->write(std::move(batch)); + c_buff[i] = i % 0xff; } + { WriteBatch batch; + batch.putPage(1, 0, std::make_shared(c_buff, buf_sz), buf_sz, PageFieldSizes{{32, 64, 79, 128, 196, 256, 269}}); + batch.putRefPage(3, 1); batch.delPage(1); - batch.putPage(4, 0, getDefaultBuffer(), buf_sz, {}); + batch.putPage(4, 0, std::make_shared(c_buff, buf_sz), buf_sz, {}); page_storage->write(std::move(batch)); } @@ -1370,7 +1374,8 @@ try { WriteBatch batch; - batch.putPage(5, 0, getDefaultBuffer(), buf_sz, {}); + memset(c_buff, 0, buf_sz); + batch.putPage(5, 0, std::make_shared(c_buff, buf_sz), buf_sz, {}); page_storage->write(std::move(batch)); } diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_storage_gc.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_storage_gc.cpp index 93cf7785cb0..6e6e9bceba7 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_storage_gc.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_storage_gc.cpp @@ -250,10 +250,6 @@ try { WriteBatch batch; batch.putPage(page_id1, default_tag, getDefaultBuffer(), buf_sz); - page_storage->write(std::move(batch)); - } - { - WriteBatch batch; batch.putRefPage(ref_page_id2, page_id1); batch.delPage(page_id1); page_storage->write(std::move(batch)); From 82a27a37241bcb2fc7eee14f1c75005179cffcd6 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 3 Nov 2022 10:34:42 +0800 Subject: [PATCH 14/22] Revert rate limiter test changes --- dbms/src/Encryption/tests/gtest_rate_limiter.cpp | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/dbms/src/Encryption/tests/gtest_rate_limiter.cpp b/dbms/src/Encryption/tests/gtest_rate_limiter.cpp index 0d36e755323..613e552def5 100644 --- a/dbms/src/Encryption/tests/gtest_rate_limiter.cpp +++ b/dbms/src/Encryption/tests/gtest_rate_limiter.cpp @@ -14,7 +14,6 @@ #include #include -#include #include #include #include @@ -386,11 +385,9 @@ TEST(IORateLimiterTest, IOStat) ASSERT_EQ(io_rate_limiter.bg_read_limiter, nullptr); ASSERT_EQ(io_rate_limiter.fg_read_limiter, nullptr); - String dir = TiFlashTestEnv::getTemporaryPath(); - TiFlashTestEnv::tryRemovePath(dir, /*recreate*/ true); - String fname = dir + "/rate_limit_io_stat_test"; + std::string fname = "/tmp/rate_limit_io_stat_test"; int fd = ::open(fname.c_str(), O_CREAT | O_RDWR | O_DIRECT, 0666); - ASSERT_GT(fd, 0) << fmt::format("path:{}, err: {}", fname, strerror(errno)); + ASSERT_GT(fd, 0) << strerror(errno); std::unique_ptr> defer_close(&fd, [](const int * fd) { ::close(*fd); }); void * buf = nullptr; @@ -413,9 +410,6 @@ TEST(IORateLimiterTest, IOStat) TEST(IORateLimiterTest, IOStatMultiThread) { - String dir = TiFlashTestEnv::getTemporaryPath(); - TiFlashTestEnv::tryRemovePath(dir, /*recreate*/ true); - std::mutex bg_pids_mtx; std::vector bg_pids; auto add_bg_pid = [&](pid_t tid) { @@ -433,9 +427,9 @@ TEST(IORateLimiterTest, IOStatMultiThread) { add_bg_pid(syscall(SYS_gettid)); } - String fname = fmt::format("{}/rate_limit_io_stat_test_{}_{}", dir, id, (is_bg ? "bg" : "fg")); + std::string fname = "/tmp/rate_limit_io_stat_test_" + std::to_string(id) + (is_bg ? "_bg" : "_fg"); int fd = ::open(fname.c_str(), O_CREAT | O_RDWR | O_DIRECT, 0666); - ASSERT_GT(fd, 0) << fmt::format("path:{}, err: {}", fname, strerror(errno)); + ASSERT_GT(fd, 0) << strerror(errno); std::unique_ptr> defer_close(&fd, [](const int * fd) { ::close(*fd); }); void * buf = nullptr; From 0cb44924111690d26a8cdcf7187c1af8f066a082 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 3 Nov 2022 11:10:17 +0800 Subject: [PATCH 15/22] Update comments --- dbms/src/Storages/Page/V3/PageDirectory.cpp | 7 ++++++- .../Storages/Page/V3/tests/gtest_page_storage.cpp | 12 ------------ 2 files changed, 6 insertions(+), 13 deletions(-) diff --git a/dbms/src/Storages/Page/V3/PageDirectory.cpp b/dbms/src/Storages/Page/V3/PageDirectory.cpp index 2a9cd9c9590..f781853143e 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectory.cpp @@ -1242,6 +1242,8 @@ void PageDirectory::apply(PageEntriesEdit && edit, const WriteLimiterPtr & write UInt64 new_sequence = last_sequence + 1; // stage 1, persisted the changes to WAL with new versions + // Inorder to handle {put X, ref Y->X, del X} inside one WriteBatch (and + // in later batch pipeline), we will increase the sequence for each record. for (auto & r : edit.getMutRecords()) { r.version = PageVersion(new_sequence, 0); @@ -1353,6 +1355,7 @@ PageDirectory::getEntriesByBlobIds(const std::vector & blob_ids) con blob_id_set.insert(blob_id); assert(blob_id_set.size() == blob_ids.size()); + // TODO: return the max entry.size to make `BlobStore::gc` more clean std::map blob_versioned_entries; PageSize total_page_size = 0; UInt64 total_page_nums = 0; @@ -1389,6 +1392,8 @@ PageDirectory::getEntriesByBlobIds(const std::vector & blob_ids) con } } + // For the non-deleted ref-ids, we will check whether theirs original entries lay on + // `blob_id_set`. Rewrite the entries for these ref-ids to be normal pages. size_t num_ref_id_rewrite = 0; for (const auto & [ref_id, ori_id_ver] : ref_ids_maybe_rewrite) { @@ -1415,7 +1420,7 @@ PageDirectory::getEntriesByBlobIds(const std::vector & blob_ids) con } } - LOG_INFO(log, "Get entries by blob ids done [rewrite_ref_pages={}] [total_page_size={}] [total_page_nums={}]", // + LOG_INFO(log, "Get entries by blob ids done [rewrite_ref_page_num={}] [total_page_size={}] [total_page_nums={}]", // num_ref_id_rewrite, total_page_size, // total_page_nums); diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp index e0f661ad2be..a2ca7f6ab26 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp @@ -438,18 +438,6 @@ try } CATCH -// TEST_F(PageStorageTest, UnsupportWriteBatch) -// { -// // This will make put && delete share the same -// // version. If full gc happen, then the upsert -// // entry can not be insert between put && delete. -// WriteBatch wb; -// wb.putPage(1, default_tag, getDefaultBuffer(), buf_sz); -// wb.putRefPage(2, 1); -// wb.delPage(1); -// ASSERT_THROW(page_storage->write(std::move(wb)), DB::Exception); -// } - TEST_F(PageStorageTest, WriteMultipleBatchRead1) try { From 7cbe80d044cfc510b1926e258dfccaed8f019405 Mon Sep 17 00:00:00 2001 From: JaySon Date: Thu, 3 Nov 2022 11:50:58 +0800 Subject: [PATCH 16/22] Update comments --- dbms/src/Storages/Page/V3/PageDirectory.cpp | 3 +++ dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp | 2 ++ 2 files changed, 5 insertions(+) diff --git a/dbms/src/Storages/Page/V3/PageDirectory.cpp b/dbms/src/Storages/Page/V3/PageDirectory.cpp index f781853143e..dc165ca2f3e 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectory.cpp @@ -561,6 +561,7 @@ PageSize VersionedPageEntries::getEntriesByBlobIds( auto page_lock = acquireLock(); if (type == EditRecordType::VAR_REF) { + // If the ref-id is not deleted, we will check whether its origin_entry.file_id in blob_ids if (!is_deleted) { ref_ids_maybe_rewrite[page_id] = {ori_page_id, create_ver}; @@ -579,6 +580,7 @@ PageSize VersionedPageEntries::getEntriesByBlobIds( if (iter->second.isDelete()) return 0; + // If `entry.file_id in blob_ids` we will rewrite this non-deleted page to a new location assert(iter->second.isEntry()); // The total entries size that will be moved PageSize entry_size_full_gc = 0; @@ -1333,6 +1335,7 @@ void PageDirectory::gcApply(PageEntriesEdit && migrated_edit, const WriteLimiter auto id_to_deref = versioned_entries->createUpsertEntry(record.version, record.entry); if (id_to_deref.low != INVALID_PAGE_ID) { + // The ref-page is rewritten into a normal page, we need to decrease the ref-count of original page MVCCMapType::const_iterator deref_iter; { std::shared_lock read_lock(table_rw_mutex); diff --git a/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp b/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp index 1e433674c8f..1b2493b9c35 100644 --- a/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp @@ -200,7 +200,9 @@ void PageDirectoryFactory::applyRecord( auto id_to_deref = version_list->createUpsertEntry(restored_version, r.entry); if (id_to_deref.low != INVALID_PAGE_ID) { + // The ref-page is rewritten into a normal page, we need to decrease the ref-count of the original page auto deref_iter = dir->mvcc_table_directory.find(id_to_deref); + RUNTIME_CHECK_MSG(deref_iter != dir->mvcc_table_directory.end(), "Can't find [page_id={}] to deref when applying upsert", id_to_deref); auto deref_res = deref_iter->second->derefAndClean(/*lowest_seq*/ 0, id_to_deref, restored_version, 1, nullptr); RUNTIME_ASSERT(!deref_res); } From 571505782d0559c185e3a8c2d4c282813f94b118 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 3 Nov 2022 12:02:54 +0800 Subject: [PATCH 17/22] Format files --- dbms/src/Storages/Page/V3/PageDirectory.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Storages/Page/V3/PageDirectory.cpp b/dbms/src/Storages/Page/V3/PageDirectory.cpp index dc165ca2f3e..5a65bd33f18 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectory.cpp @@ -580,7 +580,7 @@ PageSize VersionedPageEntries::getEntriesByBlobIds( if (iter->second.isDelete()) return 0; - // If `entry.file_id in blob_ids` we will rewrite this non-deleted page to a new location + // If `entry.file_id in blob_ids` we will rewrite this non-deleted page to a new location assert(iter->second.isEntry()); // The total entries size that will be moved PageSize entry_size_full_gc = 0; From a28b8a0da007416a53396b3693acc04cf97ba9fa Mon Sep 17 00:00:00 2001 From: JaySon Date: Thu, 3 Nov 2022 14:28:50 +0800 Subject: [PATCH 18/22] Apply suggestions from code review Co-authored-by: lidezhu <47731263+lidezhu@users.noreply.github.com> --- dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp index 82028655f19..865da8e1a43 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp @@ -1381,7 +1381,7 @@ try // Full GC get entries auto candidate_entries_1 = dir->getEntriesByBlobIds({1}); EXPECT_EQ(candidate_entries_1.first.size(), 1); - EXPECT_EQ(candidate_entries_1.first[1].size(), 1); // 3 entries for 2 page id + EXPECT_EQ(candidate_entries_1.first[1].size(), 1); // 1 entries for 1 page id auto candidate_entries_2_3 = dir->getEntriesByBlobIds({2, 3}); EXPECT_EQ(candidate_entries_2_3.first.size(), 1); From 699e1bfc7f0c2264f502c1c8c50e800439524080 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 3 Nov 2022 14:30:54 +0800 Subject: [PATCH 19/22] Update comments --- dbms/src/Storages/Page/V3/PageDirectory.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dbms/src/Storages/Page/V3/PageDirectory.cpp b/dbms/src/Storages/Page/V3/PageDirectory.cpp index 5a65bd33f18..26867eb108f 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectory.cpp @@ -170,10 +170,10 @@ PageIdV3Internal VersionedPageEntries::createUpsertEntry(const PageVersion & ver { // Full gc commit, we need to rewrite this page to // be normal page with upsert-entry. - // TODO: Also we need to decrease the ref-count of ori_page_id. entries.emplace(ver, EntryOrDelete::newNormalEntry(entry)); is_deleted = false; type = EditRecordType::VAR_ENTRY; + // Also we need to decrease the ref-count of ori_page_id. return ori_page_id; } else @@ -185,6 +185,9 @@ PageIdV3Internal VersionedPageEntries::createUpsertEntry(const PageVersion & ver entries.emplace(delete_ver, EntryOrDelete::newDelete()); is_deleted = false; type = EditRecordType::VAR_ENTRY; + // Though the ref-id is marked as deleted, but the ref-count of + // ori_page_id is not decreased. Return the ori_page_id + // for decreasing ref-count. return ori_page_id; } } From 23632d16519a876cae66c6126841f32a601ba3c8 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 3 Nov 2022 19:19:51 +0800 Subject: [PATCH 20/22] Update comments --- dbms/src/Storages/Page/V3/PageDirectory.h | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/dbms/src/Storages/Page/V3/PageDirectory.h b/dbms/src/Storages/Page/V3/PageDirectory.h index 6fb09672347..cf0f829a615 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.h +++ b/dbms/src/Storages/Page/V3/PageDirectory.h @@ -162,6 +162,10 @@ class VersionedPageEntries void createNewEntry(const PageVersion & ver, const PageEntryV3 & entry); + // Commit the upsert entry after full gc. + // Return a PageId, if the page id is valid, it means it rewrite a RefPage into + // a normal Page. Caller must call `derefAndClean` to decrease the ref-count of + // the returing page id. [[nodiscard]] PageIdV3Internal createUpsertEntry(const PageVersion & ver, const PageEntryV3 & entry); bool createNewRef(const PageVersion & ver, PageIdV3Internal ori_page_id); From 2af750b920d44a8640bdb557d6de155e6ee7ccb2 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 3 Nov 2022 21:55:47 +0800 Subject: [PATCH 21/22] Address comment --- dbms/src/Storages/Page/V3/PageDirectory.cpp | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/dbms/src/Storages/Page/V3/PageDirectory.cpp b/dbms/src/Storages/Page/V3/PageDirectory.cpp index 26867eb108f..63e07f6eed5 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectory.cpp @@ -123,13 +123,7 @@ PageIdV3Internal VersionedPageEntries::createUpsertEntry(const PageVersion & ver { auto page_lock = acquireLock(); - if (type == EditRecordType::VAR_DELETE) - { - type = EditRecordType::VAR_ENTRY; - assert(entries.empty()); - entries.emplace(ver, EntryOrDelete::newNormalEntry(entry)); - return buildV3Id(TEST_NAMESPACE_ID, INVALID_PAGE_ID); - } + // For applying upsert entry, only `VAR_ENTRY`/`VAR_REF` is valid state. if (type == EditRecordType::VAR_ENTRY) { @@ -160,7 +154,7 @@ PageIdV3Internal VersionedPageEntries::createUpsertEntry(const PageVersion & ver // create a new version that inherit the `being_ref_count` of the last entry entries.emplace(ver, EntryOrDelete::newReplacingEntry(last_iter->second, entry)); } - return buildV3Id(TEST_NAMESPACE_ID, INVALID_PAGE_ID); + return buildV3Id(0, INVALID_PAGE_ID); } if (type == EditRecordType::VAR_REF) @@ -168,8 +162,8 @@ PageIdV3Internal VersionedPageEntries::createUpsertEntry(const PageVersion & ver // an ref-page is rewritten into a normal page if (!is_deleted) { - // Full gc commit, we need to rewrite this page to - // be normal page with upsert-entry. + // Full GC has rewritten new data on disk, we need to update this RefPage + // to be a normal page with the upsert-entry. entries.emplace(ver, EntryOrDelete::newNormalEntry(entry)); is_deleted = false; type = EditRecordType::VAR_ENTRY; @@ -178,9 +172,10 @@ PageIdV3Internal VersionedPageEntries::createUpsertEntry(const PageVersion & ver } else { - // The ref-id is deleted before full gc commit, - // we need to rewrite this page to be normal page - // with upsert-entry and a delete. + // The ref-id is deleted before full gc commit, but the new entry is + // rewritten into `entry`. We need to update this RefPage to be a + // be normal page with upsert-entry and a delete. Then later GC will + // remove the useless data on `entry`. entries.emplace(ver, EntryOrDelete::newNormalEntry(entry)); entries.emplace(delete_ver, EntryOrDelete::newDelete()); is_deleted = false; From d1cad72f40390a545c07b0244eed7ef4e127a5d1 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 3 Nov 2022 21:57:03 +0800 Subject: [PATCH 22/22] fix typo --- dbms/src/Storages/Page/V3/PageDirectory.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Storages/Page/V3/PageDirectory.cpp b/dbms/src/Storages/Page/V3/PageDirectory.cpp index 63e07f6eed5..2cf551e47f1 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectory.cpp @@ -172,7 +172,7 @@ PageIdV3Internal VersionedPageEntries::createUpsertEntry(const PageVersion & ver } else { - // The ref-id is deleted before full gc commit, but the new entry is + // The ref-id is deleted before full gc commit, but the data is // rewritten into `entry`. We need to update this RefPage to be a // be normal page with upsert-entry and a delete. Then later GC will // remove the useless data on `entry`.