diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index 3ebfdbe4358..8c2e838bf75 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -69,7 +69,8 @@ std::unordered_map> FailPointHelper::f M(exception_mpp_hash_build) \ M(exception_before_drop_segment) \ M(exception_after_drop_segment) \ - M(exception_between_schema_change_in_the_same_diff) + M(exception_between_schema_change_in_the_same_diff) \ + M(force_ps_wal_compact) #define APPLY_FOR_FAILPOINTS(M) \ M(skip_check_segment_update) \ diff --git a/dbms/src/Encryption/tests/gtest_encryption_test.cpp b/dbms/src/Encryption/tests/gtest_encryption_test.cpp index 2fa20631cbd..08f04f392b7 100644 --- a/dbms/src/Encryption/tests/gtest_encryption_test.cpp +++ b/dbms/src/Encryption/tests/gtest_encryption_test.cpp @@ -216,14 +216,24 @@ TEST_P(EncryptionTest, EncryptionTest) EXPECT_TRUE(testEncryption(16, 16 * 2, test::IV_OVERFLOW_FULL)); } +INSTANTIATE_TEST_CASE_P( + EncryptionTestInstance, + EncryptionTest, + testing::Combine( + testing::Bool(), + testing::Values( + EncryptionMethod::Aes128Ctr, + EncryptionMethod::Aes192Ctr, + EncryptionMethod::Aes256Ctr, #if USE_GM_SSL -INSTANTIATE_TEST_CASE_P(EncryptionTestInstance, EncryptionTest, testing::Combine(testing::Bool(), testing::Values(EncryptionMethod::Aes128Ctr, EncryptionMethod::Aes192Ctr, EncryptionMethod::Aes256Ctr, EncryptionMethod::SM4Ctr))); + EncryptionMethod::SM4Ctr #elif OPENSSL_VERSION_NUMBER < 0x1010100fL || defined(OPENSSL_NO_SM4) -INSTANTIATE_TEST_CASE_P(EncryptionTestInstance, EncryptionTest, testing::Combine(testing::Bool(), testing::Values(EncryptionMethod::Aes128Ctr, EncryptionMethod::Aes192Ctr, EncryptionMethod::Aes256Ctr))); +// not support SM4 #else -// Openssl support SM4 after 1.1.1 release version. -INSTANTIATE_TEST_CASE_P(EncryptionTestInstance, EncryptionTest, testing::Combine(testing::Bool(), testing::Values(EncryptionMethod::Aes128Ctr, EncryptionMethod::Aes192Ctr, EncryptionMethod::Aes256Ctr, EncryptionMethod::SM4Ctr))); + // Openssl support SM4 after 1.1.1 release version. + EncryptionMethod::SM4Ctr #endif + ))); TEST(PosixWritableFileTest, test) diff --git a/dbms/src/Storages/Page/V3/BlobStore.cpp b/dbms/src/Storages/Page/V3/BlobStore.cpp index d30ad6f3cb9..6dee4f0b088 100644 --- a/dbms/src/Storages/Page/V3/BlobStore.cpp +++ b/dbms/src/Storages/Page/V3/BlobStore.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -27,6 +28,7 @@ #include #include #include +#include #include #include @@ -34,6 +36,7 @@ #include #include #include +#include namespace ProfileEvents { @@ -1005,7 +1008,7 @@ std::vector BlobStore::getGCStats() // Check if GC is required if (stat->sm_valid_rate <= config.heavy_gc_valid_rate) { - LOG_TRACE(log, "Current [blob_id={}] valid rate is {:.2f}, Need do compact GC", stat->id, stat->sm_valid_rate); + LOG_TRACE(log, "Current [blob_id={}] valid rate is {:.2f}, full GC", stat->id, stat->sm_valid_rate); blob_need_gc.emplace_back(stat->id); // Change current stat to read only @@ -1015,7 +1018,7 @@ std::vector BlobStore::getGCStats() else { blobstore_gc_info.appendToNoNeedGCBlob(stat->id, stat->sm_valid_rate); - LOG_TRACE(log, "Current [blob_id={}] valid rate is {:.2f}, No need to GC.", stat->id, stat->sm_valid_rate); + LOG_TRACE(log, "Current [blob_id={}] valid rate is {:.2f}, no need to GC", stat->id, stat->sm_valid_rate); } if (right_margin != stat->sm_total_size) @@ -1049,7 +1052,7 @@ PageEntriesEdit BlobStore::gc(std::map & { throw Exception("BlobStore can't do gc if nothing need gc.", ErrorCodes::LOGICAL_ERROR); } - LOG_INFO(log, "BlobStore gc will migrate {:.2f}MB into new Blobs", (1.0 * total_page_size / DB::MB)); + LOG_INFO(log, "BlobStore gc will migrate {} into new blob files", formatReadableSizeWithBinarySuffix(total_page_size)); auto write_blob = [this, total_page_size, &written_blobs, &write_limiter](const BlobFileId & file_id, char * data_begin, diff --git a/dbms/src/Storages/Page/V3/PageDirectory.cpp b/dbms/src/Storages/Page/V3/PageDirectory.cpp index 46a719c2f84..2cf551e47f1 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectory.cpp @@ -31,6 +31,7 @@ #include #include #include +#include #include #include @@ -73,6 +74,7 @@ void VersionedPageEntries::createNewEntry(const PageVersion & ver, const PageEnt if (type == EditRecordType::VAR_DELETE) { type = EditRecordType::VAR_ENTRY; + assert(entries.empty()); entries.emplace(ver, EntryOrDelete::newNormalEntry(entry)); return; } @@ -117,6 +119,83 @@ void VersionedPageEntries::createNewEntry(const PageVersion & ver, const PageEnt ErrorCodes::PS_DIR_APPLY_INVALID_STATUS); } +PageIdV3Internal VersionedPageEntries::createUpsertEntry(const PageVersion & ver, const PageEntryV3 & entry) +{ + auto page_lock = acquireLock(); + + // For applying upsert entry, only `VAR_ENTRY`/`VAR_REF` is valid state. + + if (type == EditRecordType::VAR_ENTRY) + { + auto last_iter = MapUtils::findLess(entries, PageVersion(ver.sequence + 1, 0)); + if (last_iter == entries.end()) + { + entries.emplace(ver, EntryOrDelete::newNormalEntry(entry)); + } + else if (last_iter->second.isDelete()) + { + // append after delete + entries.emplace(ver, EntryOrDelete::newNormalEntry(entry)); + } + else + { + assert(last_iter->second.isEntry()); + // It is ok to replace the entry with same sequence and newer epoch, but not valid + // to replace the entry with newer sequence. + if (unlikely(last_iter->second.being_ref_count != 1 && last_iter->first.sequence < ver.sequence)) + { + throw Exception( + fmt::format("Try to replace normal entry with an newer seq [ver={}] [prev_ver={}] [last_entry={}]", + ver, + last_iter->first, + last_iter->second.toDebugString()), + ErrorCodes::LOGICAL_ERROR); + } + // create a new version that inherit the `being_ref_count` of the last entry + entries.emplace(ver, EntryOrDelete::newReplacingEntry(last_iter->second, entry)); + } + return buildV3Id(0, INVALID_PAGE_ID); + } + + if (type == EditRecordType::VAR_REF) + { + // an ref-page is rewritten into a normal page + if (!is_deleted) + { + // Full GC has rewritten new data on disk, we need to update this RefPage + // to be a normal page with the upsert-entry. + entries.emplace(ver, EntryOrDelete::newNormalEntry(entry)); + is_deleted = false; + type = EditRecordType::VAR_ENTRY; + // Also we need to decrease the ref-count of ori_page_id. + return ori_page_id; + } + else + { + // The ref-id is deleted before full gc commit, but the data is + // rewritten into `entry`. We need to update this RefPage to be a + // be normal page with upsert-entry and a delete. Then later GC will + // remove the useless data on `entry`. + entries.emplace(ver, EntryOrDelete::newNormalEntry(entry)); + entries.emplace(delete_ver, EntryOrDelete::newDelete()); + is_deleted = false; + type = EditRecordType::VAR_ENTRY; + // Though the ref-id is marked as deleted, but the ref-count of + // ori_page_id is not decreased. Return the ori_page_id + // for decreasing ref-count. + return ori_page_id; + } + } + + throw Exception( + fmt::format("try to create upsert entry version with invalid state " + "[ver={}] [entry={}] [state={}]", + ver, + ::DB::PS::V3::toDebugString(entry), + toDebugString()), + ErrorCodes::PS_DIR_APPLY_INVALID_STATUS); +} + // Create a new external version with version=`ver`. // If create success, then return a shared_ptr as a holder for page_id. The holder // will be release when this external version is totally removed. @@ -374,13 +453,15 @@ std::optional VersionedPageEntries::getEntry(UInt64 seq) const return std::nullopt; } -std::optional VersionedPageEntries::getLastEntry() const +std::optional VersionedPageEntries::getLastEntry(std::optional seq) const { auto page_lock = acquireLock(); if (type == EditRecordType::VAR_ENTRY) { for (auto it_r = entries.rbegin(); it_r != entries.rend(); it_r++) { + if (seq.has_value() && it_r->first.sequence > seq.value()) + continue; if (it_r->second.isEntry()) { return it_r->second.entry; @@ -466,34 +547,48 @@ Int64 VersionedPageEntries::incrRefCount(const PageVersion & ver) PageSize VersionedPageEntries::getEntriesByBlobIds( const std::unordered_set & blob_ids, PageIdV3Internal page_id, - std::map & blob_versioned_entries) + std::map & blob_versioned_entries, + std::map> & ref_ids_maybe_rewrite) { + // `blob_versioned_entries`: // blob_file_0, [, - // , // ] // blob_file_1, [...] // ... - // the total entries size taken out - PageSize total_entries_size = 0; + auto page_lock = acquireLock(); - if (type == EditRecordType::VAR_ENTRY) + if (type == EditRecordType::VAR_REF) { - for (const auto & [ver, entry_or_del] : entries) + // If the ref-id is not deleted, we will check whether its origin_entry.file_id in blob_ids + if (!is_deleted) { - if (!entry_or_del.isEntry()) - { - continue; - } - - const auto & entry = entry_or_del.entry; - if (blob_ids.count(entry.file_id) > 0) - { - blob_versioned_entries[entry.file_id].emplace_back(page_id, ver, entry); - total_entries_size += entry.size; - } + ref_ids_maybe_rewrite[page_id] = {ori_page_id, create_ver}; } + return 0; } - return total_entries_size; + + if (type != EditRecordType::VAR_ENTRY) + return 0; + + assert(type == EditRecordType::VAR_ENTRY); + // Empty or already deleted + if (entries.empty()) + return 0; + auto iter = entries.rbegin(); + if (iter->second.isDelete()) + return 0; + + // If `entry.file_id in blob_ids` we will rewrite this non-deleted page to a new location + assert(iter->second.isEntry()); + // The total entries size that will be moved + PageSize entry_size_full_gc = 0; + const auto & last_entry = iter->second; + if (blob_ids.count(last_entry.entry.file_id) > 0) + { + blob_versioned_entries[last_entry.entry.file_id].emplace_back(page_id, /* ver */ iter->first, last_entry.entry); + entry_size_full_gc += last_entry.entry.size; + } + return entry_size_full_gc; } bool VersionedPageEntries::cleanOutdatedEntries( @@ -635,6 +730,8 @@ bool VersionedPageEntries::derefAndClean(UInt64 lowest_seq, PageIdV3Internal pag } iter->second.being_ref_count -= deref_count; + if (lowest_seq == 0) + return false; // Clean outdated entries after decreased the ref-counter // set `normal_entries_to_deref` to be nullptr to ignore cleaning ref-var-entries return cleanOutdatedEntries(lowest_seq, /*normal_entries_to_deref*/ nullptr, entries_removed, page_lock); @@ -1140,13 +1237,17 @@ void PageDirectory::apply(PageEntriesEdit && edit, const WriteLimiterPtr & write // also needs to be protected by `write_lock` throughout the `apply` // TODO: It is totally serialized, make it a pipeline std::unique_lock write_lock(table_rw_mutex); - UInt64 last_sequence = sequence.load(); - PageVersion new_version(last_sequence + 1, 0); + const UInt64 last_sequence = sequence.load(); + // new sequence allocated in this edit + UInt64 new_sequence = last_sequence + 1; - // stage 1, persisted the changes to WAL with version [seq=last_seq + 1, epoch=0] + // stage 1, persisted the changes to WAL with new versions + // Inorder to handle {put X, ref Y->X, del X} inside one WriteBatch (and + // in later batch pipeline), we will increase the sequence for each record. for (auto & r : edit.getMutRecords()) { - r.version = new_version; + r.version = PageVersion(new_sequence, 0); + new_sequence += 1; } wal->apply(ser::serializeTo(edit), write_limiter); @@ -1169,7 +1270,7 @@ void PageDirectory::apply(PageEntriesEdit && edit, const WriteLimiterPtr & write { case EditRecordType::PUT_EXTERNAL: { - auto holder = version_list->createNewExternal(new_version); + auto holder = version_list->createNewExternal(r.version); if (holder) { // put the new created holder into `external_ids` @@ -1179,13 +1280,13 @@ void PageDirectory::apply(PageEntriesEdit && edit, const WriteLimiterPtr & write break; } case EditRecordType::PUT: - version_list->createNewEntry(new_version, r.entry); + version_list->createNewEntry(r.version, r.entry); break; case EditRecordType::DEL: - version_list->createDelete(new_version); + version_list->createDelete(r.version); break; case EditRecordType::REF: - applyRefEditRecord(mvcc_table_directory, version_list, r, new_version); + applyRefEditRecord(mvcc_table_directory, version_list, r, r.version); break; case EditRecordType::UPSERT: case EditRecordType::VAR_DELETE: @@ -1197,13 +1298,13 @@ void PageDirectory::apply(PageEntriesEdit && edit, const WriteLimiterPtr & write } catch (DB::Exception & e) { - e.addMessage(fmt::format(" [type={}] [page_id={}] [ver={}] [edit_size={}]", magic_enum::enum_name(r.type), r.page_id, new_version, edit.size())); + e.addMessage(fmt::format(" [type={}] [page_id={}] [ver={}] [edit_size={}]", magic_enum::enum_name(r.type), r.page_id, r.version, edit.size())); e.rethrow(); } } // stage 3, the edit committed, incr the sequence number to publish changes for `createSnapshot` - sequence.fetch_add(1); + sequence.fetch_add(new_sequence - last_sequence); } void PageDirectory::gcApply(PageEntriesEdit && migrated_edit, const WriteLimiterPtr & write_limiter) @@ -1224,15 +1325,24 @@ void PageDirectory::gcApply(PageEntriesEdit && migrated_edit, const WriteLimiter { std::shared_lock read_lock(table_rw_mutex); iter = mvcc_table_directory.find(record.page_id); - if (unlikely(iter == mvcc_table_directory.end())) - { - throw Exception(fmt::format("Can't find [page_id={}] while doing gcApply", record.page_id), ErrorCodes::LOGICAL_ERROR); - } + RUNTIME_CHECK_MSG(iter != mvcc_table_directory.end(), "Can't find [page_id={}] while doing gcApply", record.page_id); } // release the read lock on `table_rw_mutex` // Append the gc version to version list const auto & versioned_entries = iter->second; - versioned_entries->createNewEntry(record.version, record.entry); + auto id_to_deref = versioned_entries->createUpsertEntry(record.version, record.entry); + if (id_to_deref.low != INVALID_PAGE_ID) + { + // The ref-page is rewritten into a normal page, we need to decrease the ref-count of original page + MVCCMapType::const_iterator deref_iter; + { + std::shared_lock read_lock(table_rw_mutex); + deref_iter = mvcc_table_directory.find(id_to_deref); + RUNTIME_CHECK_MSG(deref_iter != mvcc_table_directory.end(), "Can't find [page_id={}] to deref after gcApply", id_to_deref); + } + auto deref_res = deref_iter->second->derefAndClean(/*lowest_seq*/ 0, id_to_deref, record.version, 1, nullptr); + RUNTIME_ASSERT(!deref_res); + } } LOG_INFO(log, "GC apply done. [edit size={}]", migrated_edit.size()); @@ -1246,47 +1356,73 @@ PageDirectory::getEntriesByBlobIds(const std::vector & blob_ids) con blob_id_set.insert(blob_id); assert(blob_id_set.size() == blob_ids.size()); + // TODO: return the max entry.size to make `BlobStore::gc` more clean std::map blob_versioned_entries; PageSize total_page_size = 0; - - MVCCMapType::const_iterator iter; - { - std::shared_lock read_lock(table_rw_mutex); - iter = mvcc_table_directory.cbegin(); - if (iter == mvcc_table_directory.end()) - return {blob_versioned_entries, total_page_size}; - } - UInt64 total_page_nums = 0; - while (true) + std::map> ref_ids_maybe_rewrite; + { - // `iter` is an iter that won't be invalid cause by `apply`/`gcApply`. - // do scan on the version list without lock on `mvcc_table_directory`. - auto page_id = iter->first; - const auto & version_entries = iter->second; - auto single_page_size = version_entries->getEntriesByBlobIds(blob_id_set, page_id, blob_versioned_entries); - total_page_size += single_page_size; - if (single_page_size != 0) + MVCCMapType::const_iterator iter; { - total_page_nums++; + std::shared_lock read_lock(table_rw_mutex); + iter = mvcc_table_directory.cbegin(); + if (iter == mvcc_table_directory.end()) + return {blob_versioned_entries, total_page_size}; } + while (true) { - std::shared_lock read_lock(table_rw_mutex); - iter++; - if (iter == mvcc_table_directory.end()) - break; + // `iter` is an iter that won't be invalid cause by `apply`/`gcApply`. + // do scan on the version list without lock on `mvcc_table_directory`. + auto page_id = iter->first; + const auto & version_entries = iter->second; + auto single_page_size = version_entries->getEntriesByBlobIds(blob_id_set, page_id, blob_versioned_entries, ref_ids_maybe_rewrite); + total_page_size += single_page_size; + if (single_page_size != 0) + { + total_page_nums++; + } + + { + std::shared_lock read_lock(table_rw_mutex); + iter++; + if (iter == mvcc_table_directory.end()) + break; + } } } - for (const auto blob_id : blob_ids) + + // For the non-deleted ref-ids, we will check whether theirs original entries lay on + // `blob_id_set`. Rewrite the entries for these ref-ids to be normal pages. + size_t num_ref_id_rewrite = 0; + for (const auto & [ref_id, ori_id_ver] : ref_ids_maybe_rewrite) { - if (blob_versioned_entries.find(blob_id) == blob_versioned_entries.end()) + const auto ori_id = std::get<0>(ori_id_ver); + const auto ver = std::get<1>(ori_id_ver); + MVCCMapType::const_iterator page_iter; + { + std::shared_lock read_lock(table_rw_mutex); + page_iter = mvcc_table_directory.find(ori_id); + RUNTIME_CHECK(page_iter != mvcc_table_directory.end(), ref_id, ori_id, ver); + } + const auto & version_entries = page_iter->second; + // the latest entry with version.seq <= ref_id.create_ver.seq + auto entry = version_entries->getLastEntry(ver.sequence); + RUNTIME_CHECK(entry.has_value(), ref_id, ori_id, ver); + // If the being-ref entry lays on the full gc candidate blobfiles, then we + // need to rewrite the ref-id to a normal page. + if (blob_id_set.count(entry->file_id) > 0) { - throw Exception(fmt::format("Can't get any entries from [blob_id={}]", blob_id)); + blob_versioned_entries[entry->file_id].emplace_back(ref_id, ver, *entry); + total_page_size += entry->size; + total_page_nums += 1; + num_ref_id_rewrite += 1; } } - LOG_INFO(log, "Get entries by blob ids done. [total_page_size={}] [total_page_nums={}]", // + LOG_INFO(log, "Get entries by blob ids done [rewrite_ref_page_num={}] [total_page_size={}] [total_page_nums={}]", // + num_ref_id_rewrite, total_page_size, // total_page_nums); return std::make_pair(std::move(blob_versioned_entries), total_page_size); diff --git a/dbms/src/Storages/Page/V3/PageDirectory.h b/dbms/src/Storages/Page/V3/PageDirectory.h index c8f6864aeed..cf0f829a615 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.h +++ b/dbms/src/Storages/Page/V3/PageDirectory.h @@ -57,7 +57,7 @@ class PageDirectorySnapshot : public DB::PageStorageSnapshot CurrentMetrics::add(CurrentMetrics::PSMVCCNumSnapshots); } - ~PageDirectorySnapshot() + ~PageDirectorySnapshot() override { CurrentMetrics::sub(CurrentMetrics::PSMVCCNumSnapshots); } @@ -81,7 +81,7 @@ using PageDirectorySnapshotPtr = std::shared_ptr; struct EntryOrDelete { - bool is_delete; + bool is_delete = true; Int64 being_ref_count = 1; PageEntryV3 entry; @@ -162,6 +162,12 @@ class VersionedPageEntries void createNewEntry(const PageVersion & ver, const PageEntryV3 & entry); + // Commit the upsert entry after full gc. + // Return a PageId, if the page id is valid, it means it rewrite a RefPage into + // a normal Page. Caller must call `derefAndClean` to decrease the ref-count of + // the returing page id. + [[nodiscard]] PageIdV3Internal createUpsertEntry(const PageVersion & ver, const PageEntryV3 & entry); + bool createNewRef(const PageVersion & ver, PageIdV3Internal ori_page_id); std::shared_ptr createNewExternal(const PageVersion & ver); @@ -177,7 +183,7 @@ class VersionedPageEntries std::optional getEntry(UInt64 seq) const; - std::optional getLastEntry() const; + std::optional getLastEntry(std::optional seq) const; bool isVisible(UInt64 seq) const; @@ -189,7 +195,8 @@ class VersionedPageEntries PageSize getEntriesByBlobIds( const std::unordered_set & blob_ids, PageIdV3Internal page_id, - std::map & blob_versioned_entries); + std::map & blob_versioned_entries, + std::map> & ref_ids_maybe_rewrite); /** * Given a `lowest_seq`, this will clean all outdated entries before `lowest_seq`. @@ -202,12 +209,19 @@ class VersionedPageEntries * * Return `true` iff this page can be totally removed from the whole `PageDirectory`. */ - bool cleanOutdatedEntries( + [[nodiscard]] bool cleanOutdatedEntries( UInt64 lowest_seq, std::map> * normal_entries_to_deref, PageEntriesV3 * entries_removed, const PageLock & page_lock); - bool derefAndClean( + /** + * Decrease the ref-count of entry with given `deref_ver`. + * If `lowest_seq` != 0, then it will run `cleanOutdatedEntries` after decreasing + * the ref-count. + * + * Return `true` iff this page can be totally removed from the whole `PageDirectory`. + */ + [[nodiscard]] bool derefAndClean( UInt64 lowest_seq, PageIdV3Internal page_id, const PageVersion & deref_ver, diff --git a/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp b/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp index 307d732ce79..1b2493b9c35 100644 --- a/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -20,6 +21,7 @@ #include #include +#include namespace DB { @@ -62,7 +64,7 @@ PageDirectoryPtr PageDirectoryFactory::createFromReader(String storage_name, WAL // We should restore the entry to `blob_stats` even if it is marked as "deleted", // or we will mistakenly reuse the space to write other blobs down into that space. // So we need to use `getLastEntry` instead of `getEntry(version)` here. - if (auto entry = entries->getLastEntry(); entry) + if (auto entry = entries->getLastEntry(std::nullopt); entry) { blob_stats->restoreByEntry(*entry); } @@ -75,14 +77,24 @@ PageDirectoryPtr PageDirectoryFactory::createFromReader(String storage_name, WAL return dir; } -PageDirectoryPtr PageDirectoryFactory::createFromEdit(String storage_name, FileProviderPtr & file_provider, PSDiskDelegatorPtr & delegator, const PageEntriesEdit & edit) +// just for test +PageDirectoryPtr PageDirectoryFactory::createFromEdit(String storage_name, FileProviderPtr & file_provider, PSDiskDelegatorPtr & delegator, PageEntriesEdit & edit) { auto [wal, reader] = WALStore::create(storage_name, file_provider, delegator, WALConfig()); (void)reader; PageDirectoryPtr dir = std::make_unique(std::move(storage_name), std::move(wal)); + + // Allocate mock sequence to run gc + UInt64 mock_sequence = 0; + for (auto & r : edit.getMutRecords()) + { + r.version.sequence = ++mock_sequence; + } + loadEdit(dir, edit); // Reset the `sequence` to the maximum of persisted. dir->sequence = max_applied_ver.sequence; + RUNTIME_CHECK(dir->sequence, mock_sequence); // After restoring from the disk, we need cleanup all invalid entries in memory, or it will // try to run GC again on some entries that are already marked as invalid in BlobStore. @@ -102,7 +114,7 @@ PageDirectoryPtr PageDirectoryFactory::createFromEdit(String storage_name, FileP // We should restore the entry to `blob_stats` even if it is marked as "deleted", // or we will mistakenly reuse the space to write other blobs down into that space. // So we need to use `getLastEntry` instead of `getEntry(version)` here. - if (auto entry = entries->getLastEntry(); entry) + if (auto entry = entries->getLastEntry(std::nullopt); entry) { blob_stats->restoreByEntry(*entry); } @@ -184,9 +196,19 @@ void PageDirectoryFactory::applyRecord( restored_version); break; case EditRecordType::UPSERT: - version_list->createNewEntry(restored_version, r.entry); + { + auto id_to_deref = version_list->createUpsertEntry(restored_version, r.entry); + if (id_to_deref.low != INVALID_PAGE_ID) + { + // The ref-page is rewritten into a normal page, we need to decrease the ref-count of the original page + auto deref_iter = dir->mvcc_table_directory.find(id_to_deref); + RUNTIME_CHECK_MSG(deref_iter != dir->mvcc_table_directory.end(), "Can't find [page_id={}] to deref when applying upsert", id_to_deref); + auto deref_res = deref_iter->second->derefAndClean(/*lowest_seq*/ 0, id_to_deref, restored_version, 1, nullptr); + RUNTIME_ASSERT(!deref_res); + } break; } + } } catch (DB::Exception & e) { diff --git a/dbms/src/Storages/Page/V3/PageDirectoryFactory.h b/dbms/src/Storages/Page/V3/PageDirectoryFactory.h index 0252729ad20..2472ed11c71 100644 --- a/dbms/src/Storages/Page/V3/PageDirectoryFactory.h +++ b/dbms/src/Storages/Page/V3/PageDirectoryFactory.h @@ -52,7 +52,7 @@ class PageDirectoryFactory PageDirectoryPtr createFromReader(String storage_name, WALStoreReaderPtr reader, WALStorePtr wal); // just for test - PageDirectoryPtr createFromEdit(String storage_name, FileProviderPtr & file_provider, PSDiskDelegatorPtr & delegator, const PageEntriesEdit & edit); + PageDirectoryPtr createFromEdit(String storage_name, FileProviderPtr & file_provider, PSDiskDelegatorPtr & delegator, PageEntriesEdit & edit); // just for test PageDirectoryFactory & setBlobStats(BlobStats & blob_stats_) diff --git a/dbms/src/Storages/Page/V3/PageStorageImpl.cpp b/dbms/src/Storages/Page/V3/PageStorageImpl.cpp index 04f875f28fa..38664aae5f0 100644 --- a/dbms/src/Storages/Page/V3/PageStorageImpl.cpp +++ b/dbms/src/Storages/Page/V3/PageStorageImpl.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include #include #include @@ -35,6 +36,10 @@ namespace ErrorCodes { extern const int NOT_IMPLEMENTED; } // namespace ErrorCodes +namespace FailPoints +{ +extern const char force_ps_wal_compact[]; +} namespace PS::V3 { PageStorageImpl::PageStorageImpl( @@ -407,9 +412,13 @@ PageStorageImpl::GCTimeStatistics PageStorageImpl::doGC(const WriteLimiterPtr & GCTimeStatistics statistics; + // TODO: rewrite the GC process and split it into smaller interface + bool force_wal_compact = false; + fiu_do_on(FailPoints::force_ps_wal_compact, { force_wal_compact = true; }); + // 1. Do the MVCC gc, clean up expired snapshot. // And get the expired entries. - if (page_directory->tryDumpSnapshot(read_limiter, write_limiter)) + if (page_directory->tryDumpSnapshot(read_limiter, write_limiter, force_wal_compact)) { GET_METRIC(tiflash_storage_page_gc_count, type_v3_mvcc_dumped).Increment(); } @@ -418,6 +427,8 @@ PageStorageImpl::GCTimeStatistics PageStorageImpl::doGC(const WriteLimiterPtr & const auto & del_entries = page_directory->gcInMemEntries(); statistics.gc_in_mem_entries_ms = gc_watch.elapsedMillisecondsFromLastTime(); + SYNC_FOR("before_PageStorageImpl::doGC_fullGC_prepare"); + // 2. Remove the expired entries in BlobStore. // It won't delete the data on the disk. // It will only update the SpaceMap which in memory. @@ -451,6 +462,8 @@ PageStorageImpl::GCTimeStatistics PageStorageImpl::doGC(const WriteLimiterPtr & return statistics; } + SYNC_FOR("before_PageStorageImpl::doGC_fullGC_commit"); + // 5. Do the BlobStore GC // After BlobStore GC, these entries will be migrated to a new blob. // Then we should notify MVCC apply the change. @@ -468,6 +481,8 @@ PageStorageImpl::GCTimeStatistics PageStorageImpl::doGC(const WriteLimiterPtr & page_directory->gcApply(std::move(gc_edit), write_limiter); statistics.full_gc_apply_ms = gc_watch.elapsedMillisecondsFromLastTime(); + SYNC_FOR("after_PageStorageImpl::doGC_fullGC_commit"); + cleanExternalPage(gc_watch, statistics); statistics.stage = GCStageType::FullGC; statistics.total_cost_ms = gc_watch.elapsedMilliseconds(); diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp index e5d0c299b45..865da8e1a43 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp @@ -1378,17 +1378,16 @@ try INSERT_ENTRY_TO(page_id, 5, 3); INSERT_ENTRY_TO(another_page_id, 6, 1); - // FIXME: This will copy many outdate pages // Full GC get entries auto candidate_entries_1 = dir->getEntriesByBlobIds({1}); EXPECT_EQ(candidate_entries_1.first.size(), 1); - EXPECT_EQ(candidate_entries_1.first[1].size(), 3); // 3 entries for 2 page id + EXPECT_EQ(candidate_entries_1.first[1].size(), 1); // 1 entries for 1 page id auto candidate_entries_2_3 = dir->getEntriesByBlobIds({2, 3}); - EXPECT_EQ(candidate_entries_2_3.first.size(), 2); + EXPECT_EQ(candidate_entries_2_3.first.size(), 1); const auto & entries_in_file2 = candidate_entries_2_3.first[2]; const auto & entries_in_file3 = candidate_entries_2_3.first[3]; - EXPECT_EQ(entries_in_file2.size(), 2); // 2 entries for 1 page id + EXPECT_EQ(entries_in_file2.empty(), true); EXPECT_EQ(entries_in_file3.size(), 1); // 1 entries for 1 page id PageEntriesEdit gc_migrate_entries; @@ -1411,6 +1410,10 @@ try // Full GC execute apply dir->gcApply(std::move(gc_migrate_entries)); + + auto snap = dir->createSnapshot(); + ASSERT_ENTRY_EQ(entry_v5, dir, page_id, snap); + ASSERT_ENTRY_EQ(entry_v6, dir, another_page_id, snap); } CATCH @@ -1429,20 +1432,16 @@ try EXPECT_EQ(dir->numPages(), 2); - // 1.1 Full GC get entries for blob_id in [1] + // A.1 Full GC get entries for blob_id in [1] auto candidate_entries_1 = dir->getEntriesByBlobIds({1}); EXPECT_EQ(candidate_entries_1.first.size(), 1); - EXPECT_EQ(candidate_entries_1.first[1].size(), 3); // 3 entries for 2 page id + EXPECT_EQ(candidate_entries_1.first[1].size(), 1); // 1 entries for `another_page_id` // for blob_id in [2, 3] auto candidate_entries_2_3 = dir->getEntriesByBlobIds({2, 3}); - EXPECT_EQ(candidate_entries_2_3.first.size(), 2); - const auto & entries_in_file2 = candidate_entries_2_3.first[2]; - const auto & entries_in_file3 = candidate_entries_2_3.first[3]; - EXPECT_EQ(entries_in_file2.size(), 2); // 2 entries for 1 page id - EXPECT_EQ(entries_in_file3.size(), 1); // 1 entry for 1 page_id + EXPECT_EQ(candidate_entries_2_3.first.empty(), true); - // 2.1 Execute GC + // B.1 Execute GC dir->gcInMemEntries(); // `page_id` get removed EXPECT_EQ(dir->numPages(), 1); @@ -1465,8 +1464,12 @@ try } } - // 1.2 Full GC execute apply - ASSERT_THROW({ dir->gcApply(std::move(gc_migrate_entries)); }, DB::Exception); + // A.2 Full GC execute apply, upsert `another_page_id`, but we still don't + // support Full GC and gcInMem run conncurrently + dir->gcApply(std::move(gc_migrate_entries)); + + auto snap = dir->createSnapshot(); + ASSERT_ENTRY_EQ(entry_v6, dir, another_page_id, snap); } CATCH @@ -1550,7 +1553,7 @@ try } CATCH -TEST_F(PageDirectoryGCTest, UpsertOnRefedEntries) +TEST_F(PageDirectoryGCTest, RewriteRefedId) try { // 10->entry1, 11->10, 12->10 @@ -1577,17 +1580,28 @@ try EXPECT_TRUE(outdated_entries.empty()); } - // upsert 10->entry2 PageEntryV3 entry2{.file_id = 2, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry3{.file_id = 2, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123 + 1024, .checksum = 0x4567}; { - PageEntriesEdit edit; + // this will return ref page 11 and 12 that need to be rewritten + // to new blob file. auto full_gc_entries = dir->getEntriesByBlobIds({1}); + ASSERT_EQ(full_gc_entries.first.size(), 1); auto ids = full_gc_entries.first.at(1); - ASSERT_EQ(ids.size(), 1); + ASSERT_EQ(ids.size(), 2); + ASSERT_EQ(std::get<0>(ids[0]), buildV3Id(TEST_NAMESPACE_ID, 11)); + ASSERT_EQ(std::get<0>(ids[1]), buildV3Id(TEST_NAMESPACE_ID, 12)); + + // upsert 11->entry2 + // upsert 12->entry3 + PageEntriesEdit edit; edit.upsertPage(std::get<0>(ids[0]), std::get<1>(ids[0]), entry2); + edit.upsertPage(std::get<0>(ids[1]), std::get<1>(ids[1]), entry3); + // this will rewrite ref page 11, 12 to normal page dir->gcApply(std::move(edit)); } + // page 10 get removed auto removed_entries = dir->gcInMemEntries(); ASSERT_EQ(removed_entries.size(), 1); EXPECT_SAME_ENTRY(removed_entries[0], entry1); @@ -1595,7 +1609,8 @@ try { auto snap = dir->createSnapshot(); EXPECT_ENTRY_EQ(entry2, dir, 11, snap); - EXPECT_ENTRY_EQ(entry2, dir, 12, snap); + EXPECT_ENTRY_EQ(entry3, dir, 12, snap); + EXPECT_ENTRY_NOT_EXIST(dir, 10, snap); } // del 11->entry2 @@ -1603,18 +1618,97 @@ try PageEntriesEdit edit; edit.del(buildV3Id(TEST_NAMESPACE_ID, 11)); dir->apply(std::move(edit)); - EXPECT_EQ(dir->gcInMemEntries().size(), 0); + // entry2 get removed + auto outdated_entries = dir->gcInMemEntries(); + ASSERT_EQ(1, outdated_entries.size()); + EXPECT_SAME_ENTRY(entry2, outdated_entries[0]); } - // del 12->entry2 + // del 12->entry3 { PageEntriesEdit edit; edit.del(buildV3Id(TEST_NAMESPACE_ID, 12)); dir->apply(std::move(edit)); - // entry2 get removed + // entry3 get removed auto outdated_entries = dir->gcInMemEntries(); - EXPECT_EQ(1, outdated_entries.size()); - EXPECT_SAME_ENTRY(entry2, *outdated_entries.begin()); + ASSERT_EQ(1, outdated_entries.size()); + EXPECT_SAME_ENTRY(entry3, outdated_entries[0]); + } + + ASSERT_EQ(dir->getAllPageIds().empty(), true); +} +CATCH + +TEST_F(PageDirectoryGCTest, RewriteRefedIdWithConcurrentDelete) +try +{ + // 10->entry1, 11->10, 12->10 + PageEntryV3 entry1{.file_id = 1, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + { + PageEntriesEdit edit; + edit.put(buildV3Id(TEST_NAMESPACE_ID, 10), entry1); + dir->apply(std::move(edit)); + } + { + PageEntriesEdit edit; + edit.ref(buildV3Id(TEST_NAMESPACE_ID, 11), buildV3Id(TEST_NAMESPACE_ID, 10)); + dir->apply(std::move(edit)); + } + { + PageEntriesEdit edit; + edit.ref(buildV3Id(TEST_NAMESPACE_ID, 12), buildV3Id(TEST_NAMESPACE_ID, 10)); + edit.del(buildV3Id(TEST_NAMESPACE_ID, 10)); + dir->apply(std::move(edit)); + } + // entry1 should not be removed + { + auto outdated_entries = dir->gcInMemEntries(); + EXPECT_TRUE(outdated_entries.empty()); + } + + PageEntryV3 entry2{.file_id = 2, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry3{.file_id = 2, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123 + 1024, .checksum = 0x4567}; + { + // this will return ref page 11 and 12 that need to be rewritten + // to new blob file. + auto full_gc_entries = dir->getEntriesByBlobIds({1}); + ASSERT_EQ(full_gc_entries.first.size(), 1); + auto ids = full_gc_entries.first.at(1); + ASSERT_EQ(ids.size(), 2); + ASSERT_EQ(std::get<0>(ids[0]), buildV3Id(TEST_NAMESPACE_ID, 11)); + ASSERT_EQ(std::get<0>(ids[1]), buildV3Id(TEST_NAMESPACE_ID, 12)); + + // unlike `RewriteRefedId`, foreground delete 11, 12 before + // full gc apply upserts + PageEntriesEdit fore_edit; + fore_edit.del(buildV3Id(TEST_NAMESPACE_ID, 11)); + fore_edit.del(buildV3Id(TEST_NAMESPACE_ID, 12)); + dir->apply(std::move(fore_edit)); + + // full gc ends, apply upserts + // upsert 11->entry2 + // upsert 12->entry3 + PageEntriesEdit edit; + edit.upsertPage(std::get<0>(ids[0]), std::get<1>(ids[0]), entry2); + edit.upsertPage(std::get<0>(ids[1]), std::get<1>(ids[1]), entry3); + // this will rewrite ref page 11, 12 to normal page + dir->gcApply(std::move(edit)); + } + + // page 10,11,12 get removed + auto removed_entries = dir->gcInMemEntries(); + ASSERT_EQ(removed_entries.size(), 3); + EXPECT_SAME_ENTRY(removed_entries[0], entry1); + EXPECT_SAME_ENTRY(removed_entries[1], entry2); + EXPECT_SAME_ENTRY(removed_entries[2], entry3); + + { + auto snap = dir->createSnapshot(); + EXPECT_ENTRY_NOT_EXIST(dir, 11, snap); + EXPECT_ENTRY_NOT_EXIST(dir, 12, snap); + EXPECT_ENTRY_NOT_EXIST(dir, 10, snap); } + + ASSERT_EQ(dir->getAllPageIds().empty(), true); } CATCH @@ -1953,7 +2047,7 @@ try dir->apply(std::move(edit)); } - auto restore_from_edit = [](const PageEntriesEdit & edit, BlobStats & stats) { + auto restore_from_edit = [](PageEntriesEdit & edit, BlobStats & stats) { auto ctx = ::DB::tests::TiFlashTestEnv::getContext(); auto provider = ctx.getFileProvider(); auto path = getTemporaryPath(); @@ -1997,7 +2091,7 @@ try PageEntryV3 entry_50_1{.file_id = 1, .size = 7890, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; PageEntryV3 entry_50_2{.file_id = 2, .size = 7890, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - auto restore_from_edit = [](const PageEntriesEdit & edit) { + auto restore_from_edit = [](PageEntriesEdit & edit) { auto ctx = ::DB::tests::TiFlashTestEnv::getContext(); auto provider = ctx.getFileProvider(); auto path = getTemporaryPath(); diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp index 9a63d28a126..a2ca7f6ab26 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp @@ -29,6 +29,8 @@ #include #include #include +#include +#include #include #include #include @@ -49,45 +51,6 @@ extern const char force_set_page_file_write_errno[]; namespace PS::V3::tests { -class PageStorageTest : public DB::base::TiFlashStorageTestBasic -{ -public: - void SetUp() override - { - TiFlashStorageTestBasic::SetUp(); - log = Logger::get(); - auto path = getTemporaryPath(); - createIfNotExist(path); - file_provider = DB::tests::TiFlashTestEnv::getContext().getFileProvider(); - delegator = std::make_shared(path); - page_storage = std::make_shared("test.t", delegator, config, file_provider); - page_storage->restore(); - } - - std::shared_ptr reopenWithConfig(const PageStorageConfig & config_) - { - auto path = getTemporaryPath(); - delegator = std::make_shared(path); - auto storage = std::make_shared("test.t", delegator, config_, file_provider); - storage->restore(); - return storage; - } - - size_t getLogFileNum() - { - auto log_files = WALStoreReader::listAllFiles(delegator, log); - return log_files.size(); - } - -protected: - FileProviderPtr file_provider; - std::unique_ptr path_pool; - PSDiskDelegatorPtr delegator; - PageStorageConfig config; - std::shared_ptr page_storage; - - LoggerPtr log; -}; TEST_F(PageStorageTest, WriteRead) try @@ -478,37 +441,27 @@ CATCH TEST_F(PageStorageTest, WriteMultipleBatchRead1) try { - const UInt64 tag = 0; - const size_t buf_sz = 1024; - char c_buff[buf_sz]; - for (size_t i = 0; i < buf_sz; ++i) - { - c_buff[i] = i % 0xff; - } - { WriteBatch batch; - ReadBufferPtr buff = std::make_shared(c_buff, sizeof(c_buff)); - batch.putPage(0, tag, buff, buf_sz); + batch.putPage(0, default_tag, getDefaultBuffer(), buf_sz); page_storage->write(std::move(batch)); } { WriteBatch batch; - ReadBufferPtr buff = std::make_shared(c_buff, sizeof(c_buff)); - batch.putPage(1, tag, buff, buf_sz); + batch.putPage(1, default_tag, getDefaultBuffer(), buf_sz); page_storage->write(std::move(batch)); } DB::Page page0 = page_storage->read(0); ASSERT_EQ(page0.data.size(), buf_sz); - ASSERT_EQ(page0.page_id, 0UL); + ASSERT_EQ(page0.page_id, 0); for (size_t i = 0; i < buf_sz; ++i) { EXPECT_EQ(*(page0.data.begin() + i), static_cast(i % 0xff)); } DB::Page page1 = page_storage->read(1); ASSERT_EQ(page1.data.size(), buf_sz); - ASSERT_EQ(page1.page_id, 1UL); + ASSERT_EQ(page1.page_id, 1); for (size_t i = 0; i < buf_sz; ++i) { EXPECT_EQ(*(page1.data.begin() + i), static_cast(i % 0xff)); diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.h b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.h new file mode 100644 index 00000000000..1b197a2e160 --- /dev/null +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.h @@ -0,0 +1,77 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + +namespace DB +{ +namespace PS::V3::tests +{ +class PageStorageTest : public DB::base::TiFlashStorageTestBasic +{ +public: + void SetUp() override + { + TiFlashStorageTestBasic::SetUp(); + + for (size_t i = 0; i < buf_sz; ++i) + c_buff[i] = i % 0xff; + + log = Logger::get(); + auto path = getTemporaryPath(); + createIfNotExist(path); + file_provider = DB::tests::TiFlashTestEnv::getContext().getFileProvider(); + delegator = std::make_shared(path); + page_storage = std::make_shared(String(NAME), delegator, config, file_provider); + page_storage->restore(); + } + + std::shared_ptr reopenWithConfig(const PageStorageConfig & config_) + { + auto path = getTemporaryPath(); + delegator = std::make_shared(path); + auto storage = std::make_shared(String(NAME), delegator, config_, file_provider); + storage->restore(); + return storage; + } + + size_t getLogFileNum() + { + auto log_files = WALStoreReader::listAllFiles(delegator, log); + return log_files.size(); + } + + ReadBufferPtr getDefaultBuffer() const + { + return std::make_shared(c_buff, buf_sz); + } + +protected: + FileProviderPtr file_provider; + PSDiskDelegatorPtr delegator; + PageStorageConfig config; + std::shared_ptr page_storage; + + static constexpr std::string_view NAME{"test.t"}; + DB::UInt64 default_tag = 0; + constexpr static size_t buf_sz = 1024; + char c_buff[buf_sz]; + + LoggerPtr log; +}; +} // namespace PS::V3::tests +} // namespace DB diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_storage_gc.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_storage_gc.cpp new file mode 100644 index 00000000000..6e6e9bceba7 --- /dev/null +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_storage_gc.cpp @@ -0,0 +1,327 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace DB +{ +namespace FailPoints +{ +extern const char force_ps_wal_compact[]; +} +namespace PS::V3::tests +{ + +struct FullGCParam +{ + bool keep_snap; + bool has_ref; + + explicit FullGCParam(const std::tuple & t) + : keep_snap(std::get<0>(t)) + , has_ref(std::get<1>(t)) + { + } +}; + +class PageStorageFullGCTest + : public PageStorageTest + , public testing::WithParamInterface> +{ +public: + PageStorageFullGCTest() + : test_param(GetParam()) + { + } + + void SetUp() override + { + PageStorageTest::SetUp(); + } + +protected: + FullGCParam test_param; +}; + +TEST_P(PageStorageFullGCTest, DontMoveDeletedPageId) +try +{ + // always pick all blob file for full gc + PageStorageConfig new_config; + new_config.blob_heavy_gc_valid_rate = 1.0; + page_storage->reloadSettings(new_config); + + PageId page_id1 = 101; + PageId ref_page_id = 102; + { + WriteBatch batch; + batch.putPage(page_id1, default_tag, getDefaultBuffer(), buf_sz); + if (test_param.has_ref) + batch.putRefPage(ref_page_id, page_id1); + page_storage->write(std::move(batch)); + } + + // If the page id is logically delete before `tryDumpSnapshot`, then + // full gc won't rewrite + { + WriteBatch batch; + batch.delPage(page_id1); + if (test_param.has_ref) + batch.delPage(ref_page_id); + page_storage->write(std::move(batch)); + } + + { + PageStorageSnapshotPtr snap; + if (test_param.keep_snap) + snap = page_storage->getSnapshot(""); + + // let's compact the WAL logs + auto done_snapshot = page_storage->page_directory->tryDumpSnapshot(nullptr, nullptr, /* force */ true); + ASSERT_TRUE(done_snapshot); + + // let's try full gc, this will not trigger full gc + auto done_full_gc = page_storage->gcImpl(true, nullptr, nullptr); + ASSERT_FALSE(done_full_gc); + } +} +CATCH + +INSTANTIATE_TEST_CASE_P( + Group, + PageStorageFullGCTest, + ::testing::Combine( + ::testing::Bool(), + ::testing::Bool())); + +/////// +/// PageStorageFullGCConcurrentTest +/////// + +enum class DeleteTiming +{ + BeforeFullGCPrepare = 1, + BeforeFullGCCommit, + AfterFullGCCommit, +}; +class PageStorageFullGCConcurrentTest + : public PageStorageTest + , public testing::WithParamInterface +{ +public: + PageStorageFullGCConcurrentTest() + : timing(GetParam()) + { + } + + void SetUp() override + { + PageStorageTest::SetUp(); + } + + SyncPointScopeGuard getSyncPoint() const + { + switch (timing) + { + case DeleteTiming::BeforeFullGCPrepare: + return SyncPointCtl::enableInScope("before_PageStorageImpl::doGC_fullGC_prepare"); + case DeleteTiming::BeforeFullGCCommit: + return SyncPointCtl::enableInScope("before_PageStorageImpl::doGC_fullGC_commit"); + case DeleteTiming::AfterFullGCCommit: + return SyncPointCtl::enableInScope("after_PageStorageImpl::doGC_fullGC_commit"); + } + } + + bool expectFullGCExecute() const + { + // - If delete happen before prepare, then nothing need to be rewrite. + // - If delete happen before commit, the page is logically delete. + // We should able to handle the "upsert after delete" on disk. + switch (timing) + { + case DeleteTiming::BeforeFullGCPrepare: + return false; + case DeleteTiming::BeforeFullGCCommit: + return true; + case DeleteTiming::AfterFullGCCommit: + return true; + } + } + +protected: + DeleteTiming timing; +}; + +TEST_P(PageStorageFullGCConcurrentTest, DeletePage) +try +{ + // always pick all blob file for full gc + PageStorageConfig new_config; + new_config.blob_heavy_gc_valid_rate = 1.0; + page_storage->reloadSettings(new_config); + + PageId page_id1 = 101; + { + WriteBatch batch; + batch.putPage(page_id1, default_tag, getDefaultBuffer(), buf_sz); + page_storage->write(std::move(batch)); + } + + FailPointHelper::enableFailPoint(FailPoints::force_ps_wal_compact); + auto sp_gc = getSyncPoint(); + auto th_gc = std::async([&]() { + auto done_full_gc = page_storage->gcImpl(/* not_skip */ true, nullptr, nullptr); + ASSERT_EQ(expectFullGCExecute(), done_full_gc); + }); + // let's compact the WAL logs + sp_gc.waitAndPause(); + + { + // the delete timing is decide by `sp_gc` + WriteBatch batch; + batch.delPage(page_id1); + page_storage->write(std::move(batch)); + } + + // let's try full gc + sp_gc.next(); + th_gc.get(); + + // wal compact again + page_storage->page_directory->tryDumpSnapshot(nullptr, nullptr, true); + + LOG_INFO(log, "close and restore WAL from disk"); + page_storage.reset(); + + // There is no any entry on wal log-files + auto [wal, reader] = WALStore::create(String(NAME), file_provider, delegator, WALConfig::from(new_config)); + UNUSED(wal); + size_t num_entries_on_wal = 0; + while (reader->remained()) + { + auto s = reader->next(); + if (s.has_value()) + { + auto e = ser::deserializeFrom(s.value()); + num_entries_on_wal += e.size(); + EXPECT_TRUE(e.empty()); + } + } + ASSERT_EQ(num_entries_on_wal, 0); +} +CATCH + +TEST_P(PageStorageFullGCConcurrentTest, DeleteRefPage) +try +{ + // always pick all blob file for full gc + PageStorageConfig new_config; + new_config.blob_heavy_gc_valid_rate = 1.0; + page_storage->reloadSettings(new_config); + + PageId page_id1 = 101; + PageId ref_page_id2 = 102; + PageId ref_page_id3 = 103; + PageId ref_page_id4 = 104; + { + WriteBatch batch; + batch.putPage(page_id1, default_tag, getDefaultBuffer(), buf_sz); + batch.putRefPage(ref_page_id2, page_id1); + batch.delPage(page_id1); + page_storage->write(std::move(batch)); + } + { + WriteBatch batch; + batch.putRefPage(ref_page_id3, ref_page_id2); + page_storage->write(std::move(batch)); + } + { + WriteBatch batch; + batch.putRefPage(ref_page_id4, ref_page_id3); + page_storage->write(std::move(batch)); + } + + FailPointHelper::enableFailPoint(FailPoints::force_ps_wal_compact); + auto sp_gc = getSyncPoint(); + auto th_gc = std::async([&]() { + auto done_full_gc = page_storage->gcImpl(/* not_skip */ true, nullptr, nullptr); + ASSERT_EQ(expectFullGCExecute(), done_full_gc); + }); + // let's compact the WAL logs + sp_gc.waitAndPause(); + + { + // the delete timing is decide by `sp_gc` + WriteBatch batch; + batch.delPage(ref_page_id2); + batch.delPage(ref_page_id3); + batch.delPage(ref_page_id4); + page_storage->write(std::move(batch)); + } + + // let's try full gc + sp_gc.next(); + th_gc.get(); + + // wal compact again + page_storage->page_directory->tryDumpSnapshot(nullptr, nullptr, true); + + LOG_INFO(log, "close and restore WAL from disk"); + page_storage.reset(); + + // There is no any entry on wal log-files + auto [wal, reader] = WALStore::create(String(NAME), file_provider, delegator, WALConfig::from(new_config)); + UNUSED(wal); + size_t num_entries_on_wal = 0; + while (reader->remained()) + { + auto s = reader->next(); + if (s.has_value()) + { + auto e = ser::deserializeFrom(s.value()); + num_entries_on_wal += e.size(); + EXPECT_TRUE(e.empty()); + } + } + ASSERT_EQ(num_entries_on_wal, 0); +} +CATCH + +INSTANTIATE_TEST_CASE_P( + DeleteTiming, + PageStorageFullGCConcurrentTest, + ::testing::Values( + DeleteTiming::BeforeFullGCPrepare, + DeleteTiming::BeforeFullGCCommit, + DeleteTiming::AfterFullGCCommit // + ), + [](const ::testing::TestParamInfo & param) { + return String(magic_enum::enum_name(param.param)); + }); + +} // namespace PS::V3::tests +} // namespace DB diff --git a/dbms/src/Storages/Page/V3/tests/gtest_versioned_entries.cpp b/dbms/src/Storages/Page/V3/tests/gtest_versioned_entries.cpp index b95941153e0..4424c29608b 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_versioned_entries.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_versioned_entries.cpp @@ -45,7 +45,7 @@ namespace PS::V3::tests #define INSERT_ENTRY(VERSION) INSERT_BLOBID_ENTRY(1, VERSION) #define INSERT_GC_ENTRY(VERSION, EPOCH) \ PageEntryV3 entry_gc_v##VERSION##_##EPOCH{.file_id = 2, .size = 100 * (VERSION) + (EPOCH), .padded_size = 0, .tag = 0, .offset = 0x234, .checksum = 0x5678}; \ - entries.createNewEntry(PageVersion((VERSION), (EPOCH)), entry_gc_v##VERSION##_##EPOCH); + (void)entries.createUpsertEntry(PageVersion((VERSION), (EPOCH)), entry_gc_v##VERSION##_##EPOCH); class VersionedEntriesTest : public ::testing::Test { @@ -531,54 +531,15 @@ TEST_F(VersionedEntriesTest, getEntriesByBlobId) PageId page_id = 100; auto check_for_blob_id_1 = [&](const PageIdAndVersionedEntries & entries) { auto it = entries.begin(); - - ASSERT_EQ(std::get<0>(*it).low, page_id); - ASSERT_EQ(std::get<1>(*it).sequence, 1); - ASSERT_SAME_ENTRY(std::get<2>(*it), entry_v1); - - it++; - ASSERT_EQ(std::get<0>(*it).low, page_id); - ASSERT_EQ(std::get<1>(*it).sequence, 2); - ASSERT_SAME_ENTRY(std::get<2>(*it), entry_v2); - - it++; - ASSERT_EQ(std::get<0>(*it).low, page_id); - ASSERT_EQ(std::get<1>(*it).sequence, 5); - ASSERT_SAME_ENTRY(std::get<2>(*it), entry_v5); - - it++; ASSERT_EQ(std::get<0>(*it).low, page_id); ASSERT_EQ(std::get<1>(*it).sequence, 11); ASSERT_SAME_ENTRY(std::get<2>(*it), entry_v11); }; - auto check_for_blob_id_2 = [&](const PageIdAndVersionedEntries & entries) { - auto it = entries.begin(); - - ASSERT_EQ(std::get<0>(*it).low, page_id); - ASSERT_EQ(std::get<1>(*it).sequence, 3); - ASSERT_SAME_ENTRY(std::get<2>(*it), entry_v3); - - it++; - ASSERT_EQ(std::get<0>(*it).low, page_id); - ASSERT_EQ(std::get<1>(*it).sequence, 4); - ASSERT_SAME_ENTRY(std::get<2>(*it), entry_v4); - }; - auto check_for_blob_id_3 = [&](const PageIdAndVersionedEntries & entries) { - auto it = entries.begin(); - - ASSERT_EQ(std::get<0>(*it).low, page_id); - ASSERT_EQ(std::get<1>(*it).sequence, 6); - ASSERT_SAME_ENTRY(std::get<2>(*it), entry_v6); - - it++; - ASSERT_EQ(std::get<0>(*it).low, page_id); - ASSERT_EQ(std::get<1>(*it).sequence, 8); - ASSERT_SAME_ENTRY(std::get<2>(*it), entry_v8); - }; { std::map blob_entries; - PageSize total_size = entries.getEntriesByBlobIds({/*empty*/}, buildV3Id(TEST_NAMESPACE_ID, page_id), blob_entries); + std::map> rewrite; + PageSize total_size = entries.getEntriesByBlobIds({/*empty*/}, buildV3Id(TEST_NAMESPACE_ID, page_id), blob_entries, rewrite); ASSERT_EQ(blob_entries.size(), 0); ASSERT_EQ(total_size, 0); @@ -586,92 +547,80 @@ TEST_F(VersionedEntriesTest, getEntriesByBlobId) { std::map blob_entries; + std::map> rewrite; const BlobFileId blob_id = 1; - PageSize total_size = entries.getEntriesByBlobIds({blob_id}, buildV3Id(TEST_NAMESPACE_ID, page_id), blob_entries); + PageSize total_size = entries.getEntriesByBlobIds({blob_id}, buildV3Id(TEST_NAMESPACE_ID, page_id), blob_entries, rewrite); ASSERT_EQ(blob_entries.size(), 1); - ASSERT_EQ(blob_entries[blob_id].size(), 4); - ASSERT_EQ(total_size, 1 + 2 + 5 + 11); + ASSERT_EQ(blob_entries[blob_id].size(), 1); + ASSERT_EQ(total_size, 11); check_for_blob_id_1(blob_entries[blob_id]); } { std::map blob_entries; + std::map> rewrite; const BlobFileId blob_id = 2; - PageSize total_size = entries.getEntriesByBlobIds({blob_id}, buildV3Id(TEST_NAMESPACE_ID, page_id), blob_entries); + PageSize total_size = entries.getEntriesByBlobIds({blob_id}, buildV3Id(TEST_NAMESPACE_ID, page_id), blob_entries, rewrite); - ASSERT_EQ(blob_entries.size(), 1); - ASSERT_EQ(blob_entries[blob_id].size(), 2); - ASSERT_EQ(total_size, 3 + 4); - check_for_blob_id_2(blob_entries[blob_id]); + ASSERT_EQ(blob_entries.empty(), true); + ASSERT_EQ(total_size, 0); } { std::map blob_entries; + std::map> rewrite; const BlobFileId blob_id = 3; - PageSize total_size = entries.getEntriesByBlobIds({blob_id}, buildV3Id(TEST_NAMESPACE_ID, page_id), blob_entries); + PageSize total_size = entries.getEntriesByBlobIds({blob_id}, buildV3Id(TEST_NAMESPACE_ID, page_id), blob_entries, rewrite); - ASSERT_EQ(blob_entries.size(), 1); - ASSERT_EQ(blob_entries[blob_id].size(), 2); - ASSERT_EQ(total_size, 6 + 8); - check_for_blob_id_3(blob_entries[blob_id]); + ASSERT_EQ(blob_entries.empty(), true); + ASSERT_EQ(total_size, 0); } // {1, 2} { std::map blob_entries; - PageSize total_size = entries.getEntriesByBlobIds({1, 2}, buildV3Id(TEST_NAMESPACE_ID, page_id), blob_entries); + std::map> rewrite; + PageSize total_size = entries.getEntriesByBlobIds({1, 2}, buildV3Id(TEST_NAMESPACE_ID, page_id), blob_entries, rewrite); - ASSERT_EQ(blob_entries.size(), 2); - ASSERT_EQ(blob_entries[1].size(), 4); - ASSERT_EQ(blob_entries[2].size(), 2); - ASSERT_EQ(total_size, (1 + 2 + 5 + 11) + (3 + 4)); + ASSERT_EQ(blob_entries.size(), 1); + ASSERT_EQ(blob_entries[1].size(), 1); + ASSERT_EQ(total_size, 11); check_for_blob_id_1(blob_entries[1]); - check_for_blob_id_2(blob_entries[2]); } // {2, 3} { std::map blob_entries; - PageSize total_size = entries.getEntriesByBlobIds({3, 2}, buildV3Id(TEST_NAMESPACE_ID, page_id), blob_entries); - - ASSERT_EQ(blob_entries.size(), 2); - ASSERT_EQ(blob_entries[2].size(), 2); - ASSERT_EQ(blob_entries[3].size(), 2); - ASSERT_EQ(total_size, (6 + 8) + (3 + 4)); - check_for_blob_id_2(blob_entries[2]); - check_for_blob_id_3(blob_entries[3]); + std::map> rewrite; + PageSize total_size = entries.getEntriesByBlobIds({3, 2}, buildV3Id(TEST_NAMESPACE_ID, page_id), blob_entries, rewrite); + + ASSERT_EQ(blob_entries.empty(), true); + ASSERT_EQ(total_size, 0); } // {1, 2, 3} { std::map blob_entries; - PageSize total_size = entries.getEntriesByBlobIds({1, 3, 2}, buildV3Id(TEST_NAMESPACE_ID, page_id), blob_entries); + std::map> rewrite; + PageSize total_size = entries.getEntriesByBlobIds({1, 3, 2}, buildV3Id(TEST_NAMESPACE_ID, page_id), blob_entries, rewrite); - ASSERT_EQ(blob_entries.size(), 3); - ASSERT_EQ(blob_entries[1].size(), 4); - ASSERT_EQ(blob_entries[2].size(), 2); - ASSERT_EQ(blob_entries[3].size(), 2); - ASSERT_EQ(total_size, (1 + 2 + 5 + 11) + (6 + 8) + (3 + 4)); + ASSERT_EQ(blob_entries.size(), 1); + ASSERT_EQ(blob_entries[1].size(), 1); + ASSERT_EQ(total_size, 11); check_for_blob_id_1(blob_entries[1]); - check_for_blob_id_2(blob_entries[2]); - check_for_blob_id_3(blob_entries[3]); } // {1, 2, 3, 100}; blob_id 100 is not exist in actual { std::map blob_entries; - PageSize total_size = entries.getEntriesByBlobIds({1, 3, 2, 4}, buildV3Id(TEST_NAMESPACE_ID, page_id), blob_entries); - - ASSERT_EQ(blob_entries.size(), 3); // 100 not exist - ASSERT_EQ(blob_entries.find(100), blob_entries.end()); - ASSERT_EQ(blob_entries[1].size(), 4); - ASSERT_EQ(blob_entries[2].size(), 2); - ASSERT_EQ(blob_entries[3].size(), 2); - ASSERT_EQ(total_size, (1 + 2 + 5 + 11) + (6 + 8) + (3 + 4)); + std::map> rewrite; + PageSize total_size = entries.getEntriesByBlobIds({1, 3, 2, 4}, buildV3Id(TEST_NAMESPACE_ID, page_id), blob_entries, rewrite); + + ASSERT_EQ(blob_entries.size(), 1); + ASSERT_EQ(blob_entries[1].size(), 1); + ASSERT_EQ(total_size, 11); check_for_blob_id_1(blob_entries[1]); - check_for_blob_id_2(blob_entries[2]); - check_for_blob_id_3(blob_entries[3]); } }