From 265dab5fd1e27b86ae9dd20394fa4f70a591e275 Mon Sep 17 00:00:00 2001 From: lidezhu <47731263+lidezhu@users.noreply.github.com> Date: Thu, 14 Jul 2022 17:35:06 +0800 Subject: [PATCH 1/3] This is an automated cherry-pick of #5357 Signed-off-by: ti-chi-bot --- dbms/src/Storages/Page/V3/BlobStore.cpp | 64 +++++--- .../Storages/Page/V3/LogFile/LogFilename.h | 12 +- dbms/src/Storages/Page/V3/PageDirectory.cpp | 79 +++++++--- dbms/src/Storages/Page/V3/PageDirectory.h | 25 ++- .../Storages/Page/V3/PageDirectoryFactory.cpp | 9 +- .../Storages/Page/V3/PageDirectoryFactory.h | 2 +- .../Page/V3/tests/gtest_page_storage.cpp | 148 +++++++++++++++++- .../Storages/Page/V3/tests/gtest_wal_log.cpp | 9 -- 8 files changed, 280 insertions(+), 68 deletions(-) diff --git a/dbms/src/Storages/Page/V3/BlobStore.cpp b/dbms/src/Storages/Page/V3/BlobStore.cpp index 4a203199473..3d7cbcebd04 100644 --- a/dbms/src/Storages/Page/V3/BlobStore.cpp +++ b/dbms/src/Storages/Page/V3/BlobStore.cpp @@ -1012,26 +1012,10 @@ PageEntriesEdit BlobStore::gc(std::map & } LOG_FMT_INFO(log, "BlobStore gc will migrate {:.2f}MB into new Blobs", (1.0 * total_page_size / DB::MB)); - const auto config_file_limit = config.file_limit_size.get(); - auto alloc_size = total_page_size > config_file_limit ? config_file_limit : total_page_size; - BlobFileOffset remaining_page_size = total_page_size - alloc_size; - - // We could make the memory consumption smooth during GC. - char * data_buf = static_cast(alloc(alloc_size)); - SCOPE_EXIT({ - free(data_buf, alloc_size); - }); - - char * data_pos = data_buf; - BlobFileOffset offset_in_data = 0; - BlobFileId blobfile_id; - BlobFileOffset file_offset_beg; - std::tie(blobfile_id, file_offset_beg) = getPosFromStats(alloc_size); - auto write_blob = [this, total_page_size, &written_blobs, &write_limiter](const BlobFileId & file_id, - char * data_beg, + char * data_begin, const BlobFileOffset & file_offset, - const BlobFileOffset & data_size) { + const PageSize & data_size) { try { auto blob_file = getBlobFile(file_id); @@ -1045,7 +1029,7 @@ PageEntriesEdit BlobStore::gc(std::map & file_offset, data_size, total_page_size); - blob_file->write(data_beg, file_offset, data_size, write_limiter, /*background*/ true); + blob_file->write(data_begin, file_offset, data_size, write_limiter, /*background*/ true); } catch (DB::Exception & e) { @@ -1064,6 +1048,23 @@ PageEntriesEdit BlobStore::gc(std::map & } }; + const auto config_file_limit = config.file_limit_size.get(); + // If `total_page_size` is greater than `config_file_limit`, we need to write the page data into multiple `BlobFile`s to + // make the memory consumption smooth during GC. + auto alloc_size = total_page_size > config_file_limit ? config_file_limit : total_page_size; + BlobFileOffset remaining_page_size = total_page_size - alloc_size; + + char * data_buf = static_cast(alloc(alloc_size)); + SCOPE_EXIT({ + free(data_buf, alloc_size); + }); + + char * data_pos = data_buf; + BlobFileOffset offset_in_data = 0; + BlobFileId blobfile_id; + BlobFileOffset file_offset_begin; + std::tie(blobfile_id, file_offset_begin) = getPosFromStats(alloc_size); + // blob_file_0, [, // , // , ... ] @@ -1073,11 +1074,16 @@ PageEntriesEdit BlobStore::gc(std::map & { for (const auto & [page_id, versioned, entry] : versioned_pageid_entry_list) { - // When we can't load the remaining data. - // we will use the original buffer to find an area to load the remaining data - if (offset_in_data + entry.size > config_file_limit) + /// If `total_page_size` is greater than `config_file_limit`, we need to write the page data into multiple `BlobFile`s. + /// So there may be some page entry that cannot be fit into the current blob file, and we need to write it into the next one. + /// And we need perform the following steps before writing data into the current blob file: + /// 1. reclaim unneeded space allocated from current blob stat if `offset_in_data` < `alloc_size`; + /// 2. update `remaining_page_size`; + /// After writing data into the current blob file, we reuse the original buffer for future write. + if (offset_in_data + entry.size > alloc_size) { - assert(file_offset_beg == 0); + assert(alloc_size == config_file_limit); + assert(file_offset_begin == 0); // Remove the span that is not actually used if (offset_in_data != alloc_size) { @@ -1086,7 +1092,7 @@ PageEntriesEdit BlobStore::gc(std::map & remaining_page_size += alloc_size - offset_in_data; // Write data into Blob. - write_blob(blobfile_id, data_buf, file_offset_beg, offset_in_data); + write_blob(blobfile_id, data_buf, file_offset_begin, offset_in_data); // Reset the position to reuse the buffer allocated data_pos = data_buf; @@ -1095,7 +1101,7 @@ PageEntriesEdit BlobStore::gc(std::map & // Acquire a span from stats for remaining data auto next_alloc_size = (remaining_page_size > config_file_limit ? config_file_limit : remaining_page_size); remaining_page_size -= next_alloc_size; - std::tie(blobfile_id, file_offset_beg) = getPosFromStats(next_alloc_size); + std::tie(blobfile_id, file_offset_begin) = getPosFromStats(next_alloc_size); } PageEntryV3 new_entry; @@ -1112,7 +1118,12 @@ PageEntriesEdit BlobStore::gc(std::map & new_entry.size = entry.size; new_entry.file_id = blobfile_id; +<<<<<<< HEAD new_entry.offset = file_offset_beg + offset_in_data; +======= + new_entry.offset = file_offset_begin + offset_in_data; + new_entry.padded_size = 0; // reset padded size to be zero +>>>>>>> c40c262576 (keep delete entry when dump snapshot (#5357)) offset_in_data += new_entry.size; data_pos += new_entry.size; @@ -1121,9 +1132,10 @@ PageEntriesEdit BlobStore::gc(std::map & } } + // write remaining data in `data_buf` into BlobFile if (offset_in_data != 0) { - write_blob(blobfile_id, data_buf, file_offset_beg, offset_in_data); + write_blob(blobfile_id, data_buf, file_offset_begin, offset_in_data); } return edit; diff --git a/dbms/src/Storages/Page/V3/LogFile/LogFilename.h b/dbms/src/Storages/Page/V3/LogFile/LogFilename.h index 6c4c3621caf..774029353bc 100644 --- a/dbms/src/Storages/Page/V3/LogFile/LogFilename.h +++ b/dbms/src/Storages/Page/V3/LogFile/LogFilename.h @@ -39,21 +39,21 @@ struct LogFilename static LogFilename parseFrom(const String & parent_path, const String & filename, LoggerPtr log); - inline String filename(LogFileStage stage) const + inline String filename(LogFileStage file_stage) const { - assert(stage != LogFileStage::Invalid); + assert(file_stage != LogFileStage::Invalid); return fmt::format( "{}_{}_{}", - ((stage == LogFileStage::Temporary) ? LOG_FILE_PREFIX_TEMP : LOG_FILE_PREFIX_NORMAL), + ((file_stage == LogFileStage::Temporary) ? LOG_FILE_PREFIX_TEMP : LOG_FILE_PREFIX_NORMAL), log_num, level_num); } - inline String fullname(LogFileStage stage) const + inline String fullname(LogFileStage file_stage) const { - assert(stage != LogFileStage::Invalid); + assert(file_stage != LogFileStage::Invalid); assert(!parent_path.empty()); - return fmt::format("{}/{}", parent_path, filename(stage)); + return fmt::format("{}/{}", parent_path, filename(file_stage)); } }; diff --git a/dbms/src/Storages/Page/V3/PageDirectory.cpp b/dbms/src/Storages/Page/V3/PageDirectory.cpp index e9b754854b8..fa57f678c7c 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectory.cpp @@ -103,7 +103,7 @@ void VersionedPageEntries::createNewEntry(const PageVersion & ver, const PageEnt ErrorCodes::LOGICAL_ERROR); } // create a new version that inherit the `being_ref_count` of the last entry - entries.emplace(ver, EntryOrDelete::newRepalcingEntry(last_iter->second, entry)); + entries.emplace(ver, EntryOrDelete::newReplacingEntry(last_iter->second, entry)); } return; } @@ -152,7 +152,7 @@ std::shared_ptr VersionedPageEntries::createNewExternal(const } else { - // apply a external with smaller ver than delete_ver, just ignore + // apply an external with smaller ver than delete_ver, just ignore return nullptr; } } @@ -478,8 +478,14 @@ PageSize VersionedPageEntries::getEntriesByBlobIds( bool VersionedPageEntries::cleanOutdatedEntries( UInt64 lowest_seq, std::map> * normal_entries_to_deref, +<<<<<<< HEAD PageEntriesV3 & entries_removed, const PageLock & /*page_lock*/) +======= + PageEntriesV3 * entries_removed, + const PageLock & /*page_lock*/, + bool keep_last_valid_var_entry) +>>>>>>> c40c262576 (keep delete entry when dump snapshot (#5357)) { if (type == EditRecordType::VAR_EXTERNAL) { @@ -525,8 +531,14 @@ bool VersionedPageEntries::cleanOutdatedEntries( } // If the first version less than is entry / external, - // then we can remove those entries prev of it - bool keep_if_being_ref = !iter->second.isEntry(); + // then we can remove those entries prev of it. + // If the first version less than is delete, + // we may keep the first valid entry before the delete entry in the following case: + // 1) if `keep_last_valid_var_entry` is true + // (this is only used when dump snapshot because there may be some upsert entry in later wal files, + // so we need keep the last valid entry here to avoid the delete entry being removed) + // 2) if `being_ref_count` > 1(this means the entry is ref by other entries) + bool last_entry_is_delete = !iter->second.isEntry(); --iter; // keep the first version less than while (true) { @@ -537,16 +549,16 @@ bool VersionedPageEntries::cleanOutdatedEntries( } else if (iter->second.isEntry()) { - if (keep_if_being_ref) + if (last_entry_is_delete) { - if (iter->second.being_ref_count == 1) + if (!keep_last_valid_var_entry && iter->second.being_ref_count == 1) { entries_removed.emplace_back(iter->second.entry); iter = entries.erase(iter); } // The `being_ref_count` for this version is valid. While for older versions, // theirs `being_ref_count` is invalid, don't need to be kept - keep_if_being_ref = false; + last_entry_is_delete = false; } else { @@ -564,7 +576,11 @@ bool VersionedPageEntries::cleanOutdatedEntries( return entries.empty() || (entries.size() == 1 && entries.begin()->second.isDelete()); } +<<<<<<< HEAD bool VersionedPageEntries::derefAndClean(UInt64 lowest_seq, PageIdV3Internal page_id, const PageVersion & deref_ver, const Int64 deref_count, PageEntriesV3 & entries_removed) +======= +bool VersionedPageEntries::derefAndClean(UInt64 lowest_seq, PageIdV3Internal page_id, const PageVersion & deref_ver, const Int64 deref_count, PageEntriesV3 * entries_removed, bool keep_last_valid_var_entry) +>>>>>>> c40c262576 (keep delete entry when dump snapshot (#5357)) { auto page_lock = acquireLock(); if (type == EditRecordType::VAR_EXTERNAL) @@ -601,7 +617,7 @@ bool VersionedPageEntries::derefAndClean(UInt64 lowest_seq, PageIdV3Internal pag // Clean outdated entries after decreased the ref-counter // set `normal_entries_to_deref` to be nullptr to ignore cleaning ref-var-entries - return cleanOutdatedEntries(lowest_seq, /*normal_entries_to_deref*/ nullptr, entries_removed, page_lock); + return cleanOutdatedEntries(lowest_seq, /*normal_entries_to_deref*/ nullptr, entries_removed, page_lock, keep_last_valid_var_entry); } throw Exception(fmt::format("calling derefAndClean with invalid state [state={}]", toDebugString())); @@ -659,14 +675,22 @@ void VersionedPageEntries::collapseTo(const UInt64 seq, const PageIdV3Internal p } auto last_version = last_iter->first; auto prev_iter = --last_iter; // Note that `last_iter` should not be used anymore - if (prev_iter->second.isEntry()) + while (true) { - if (prev_iter->second.being_ref_count == 1) - return; - // It is being ref by another id, should persist the item and delete - const auto & entry = prev_iter->second; - edit.varEntry(page_id, prev_iter->first, entry.entry, entry.being_ref_count); - edit.varDel(page_id, last_version); + // if there is any entry prev to this delete entry, + // 1) the entry may be ref by another id. + // 2) the entry may be upsert into a newer wal file by the gc process. + // So we need to keep the entry item and its delete entry in the snapshot. + if (prev_iter->second.isEntry()) + { + const auto & entry = prev_iter->second; + edit.varEntry(page_id, prev_iter->first, entry.entry, entry.being_ref_count); + edit.varDel(page_id, last_version); + break; + } + if (prev_iter == entries.begin()) + break; + prev_iter--; } } return; @@ -691,7 +715,6 @@ PageDirectory::PageDirectory(String storage_name, WALStorePtr && wal_, UInt64 ma , wal(std::move(wal_)) , max_persisted_log_files(max_persisted_log_files_) , log(Logger::get("PageDirectory", std::move(storage_name))) - { } @@ -969,7 +992,7 @@ void PageDirectory::applyRefEditRecord( const PageVersion & version) { // applying ref 3->2, existing ref 2->1, normal entry 1, then we should collapse - // the ref to be 3->1, increase the refcounting of normale entry 1 + // the ref to be 3->1, increase the refcounting of normal entry 1 auto [resolve_success, resolved_id, resolved_ver] = [&mvcc_table_directory](PageIdV3Internal id_to_resolve, PageVersion ver_to_resolve) -> std::tuple { while (true) @@ -1210,12 +1233,12 @@ PageDirectory::getEntriesByBlobIds(const std::vector & blob_ids) con return std::make_pair(std::move(blob_versioned_entries), total_page_size); } -bool PageDirectory::tryDumpSnapshot(const ReadLimiterPtr & read_limiter, const WriteLimiterPtr & write_limiter) +bool PageDirectory::tryDumpSnapshot(const ReadLimiterPtr & read_limiter, const WriteLimiterPtr & write_limiter, bool force) { bool done_any_io = false; // In order not to make read amplification too high, only apply compact logs when ... auto files_snap = wal->getFilesSnapshot(); - if (files_snap.needSave(max_persisted_log_files)) + if (files_snap.needSave(max_persisted_log_files) || (force && (!files_snap.persisted_log_files.empty()))) { // To prevent writes from affecting dumping snapshot (and vice versa), old log files // are read from disk and a temporary PageDirectory is generated for dumping snapshot. @@ -1231,7 +1254,8 @@ bool PageDirectory::tryDumpSnapshot(const ReadLimiterPtr & read_limiter, const W PageDirectoryPtr collapsed_dir = factory.createFromReader( identifier, std::move(snapshot_reader), - /*wal=*/nullptr); + /* wal */ nullptr, + /* for_dump_snapshot */ true); // The records persisted in `files_snap` is older than or equal to all records in `edit` auto edit_from_disk = collapsed_dir->dumpSnapshotToEdit(); done_any_io = wal->saveSnapshot(std::move(files_snap), std::move(edit_from_disk), write_limiter); @@ -1239,7 +1263,11 @@ bool PageDirectory::tryDumpSnapshot(const ReadLimiterPtr & read_limiter, const W return done_any_io; } +<<<<<<< HEAD PageEntriesV3 PageDirectory::gcInMemEntries() +======= +PageEntriesV3 PageDirectory::gcInMemEntries(bool return_removed_entries, bool keep_last_valid_var_entry) +>>>>>>> c40c262576 (keep delete entry when dump snapshot (#5357)) { UInt64 lowest_seq = sequence.load(); @@ -1303,8 +1331,14 @@ PageEntriesV3 PageDirectory::gcInMemEntries() const bool all_deleted = iter->second->cleanOutdatedEntries( lowest_seq, &normal_entries_to_deref, +<<<<<<< HEAD all_del_entries, iter->second->acquireLock()); +======= + return_removed_entries ? &all_del_entries : nullptr, + iter->second->acquireLock(), + keep_last_valid_var_entry); +>>>>>>> c40c262576 (keep delete entry when dump snapshot (#5357)) { std::unique_lock write_lock(table_rw_mutex); @@ -1342,7 +1376,12 @@ PageEntriesV3 PageDirectory::gcInMemEntries() page_id, /*deref_ver=*/deref_counter.first, /*deref_count=*/deref_counter.second, +<<<<<<< HEAD all_del_entries); +======= + return_removed_entries ? &all_del_entries : nullptr, + keep_last_valid_var_entry); +>>>>>>> c40c262576 (keep delete entry when dump snapshot (#5357)) if (all_deleted) { diff --git a/dbms/src/Storages/Page/V3/PageDirectory.h b/dbms/src/Storages/Page/V3/PageDirectory.h index 39b5a05a40a..b09af837f1f 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.h +++ b/dbms/src/Storages/Page/V3/PageDirectory.h @@ -98,7 +98,7 @@ struct EntryOrDelete .entry = entry, }; } - static EntryOrDelete newRepalcingEntry(const EntryOrDelete & ori_entry, const PageEntryV3 & entry) + static EntryOrDelete newReplacingEntry(const EntryOrDelete & ori_entry, const PageEntryV3 & entry) { return EntryOrDelete{ .is_delete = false, @@ -223,14 +223,25 @@ class VersionedPageEntries bool cleanOutdatedEntries( UInt64 lowest_seq, std::map> * normal_entries_to_deref, +<<<<<<< HEAD PageEntriesV3 & entries_removed, const PageLock & page_lock); +======= + PageEntriesV3 * entries_removed, + const PageLock & page_lock, + bool keep_last_valid_var_entry = false); +>>>>>>> c40c262576 (keep delete entry when dump snapshot (#5357)) bool derefAndClean( UInt64 lowest_seq, PageIdV3Internal page_id, const PageVersion & deref_ver, Int64 deref_count, +<<<<<<< HEAD PageEntriesV3 & entries_removed); +======= + PageEntriesV3 * entries_removed, + bool keep_last_valid_var_entry = false); +>>>>>>> c40c262576 (keep delete entry when dump snapshot (#5357)) void collapseTo(UInt64 seq, PageIdV3Internal page_id, PageEntriesEdit & edit); @@ -358,9 +369,19 @@ class PageDirectory void gcApply(PageEntriesEdit && migrated_edit, const WriteLimiterPtr & write_limiter = nullptr); - bool tryDumpSnapshot(const ReadLimiterPtr & read_limiter = nullptr, const WriteLimiterPtr & write_limiter = nullptr); + /// When create PageDirectory for dump snapshot, we should keep the last valid var_entry when it is deleted. + /// Because there may be some upsert entry in later wal files, and we should keep the valid var_entry and the delete entry to delete the later upsert entry. + /// And we don't restore the entries in blob store, because this PageDirectory is just read only for its entries. + bool tryDumpSnapshot(const ReadLimiterPtr & read_limiter = nullptr, const WriteLimiterPtr & write_limiter = nullptr, bool force = false); +<<<<<<< HEAD PageEntriesV3 gcInMemEntries(); +======= + // Perform a GC for in-memory entries and return the removed entries. + // If `return_removed_entries` is false, then just return an empty set. + // When dump snapshot, we need to keep the last valid entry. Check out `tryDumpSnapshot` for the reason. + PageEntriesV3 gcInMemEntries(bool return_removed_entries = true, bool keep_last_valid_var_entry = false); +>>>>>>> c40c262576 (keep delete entry when dump snapshot (#5357)) std::set getAliveExternalIds(NamespaceId ns_id) const; diff --git a/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp b/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp index 483c5073ab5..29dbc03745f 100644 --- a/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp @@ -34,7 +34,7 @@ PageDirectoryPtr PageDirectoryFactory::create(String storage_name, FileProviderP return createFromReader(storage_name, reader, std::move(wal)); } -PageDirectoryPtr PageDirectoryFactory::createFromReader(String storage_name, WALStoreReaderPtr reader, WALStorePtr wal) +PageDirectoryPtr PageDirectoryFactory::createFromReader(String storage_name, WALStoreReaderPtr reader, WALStorePtr wal, bool for_dump_snapshot) { PageDirectoryPtr dir = std::make_unique(storage_name, std::move(wal)); loadFromDisk(dir, std::move(reader)); @@ -44,10 +44,15 @@ PageDirectoryPtr PageDirectoryFactory::createFromReader(String storage_name, WAL // After restoring from the disk, we need cleanup all invalid entries in memory, or it will // try to run GC again on some entries that are already marked as invalid in BlobStore. +<<<<<<< HEAD dir->gcInMemEntries(); +======= + // It's no need to remove the expired entries in BlobStore, so skip filling removed_entries to improve performance. + dir->gcInMemEntries(/*return_removed_entries=*/false, /* keep_last_delete_entry */ for_dump_snapshot); +>>>>>>> c40c262576 (keep delete entry when dump snapshot (#5357)) LOG_FMT_INFO(DB::Logger::get("PageDirectoryFactory", storage_name), "PageDirectory restored [max_page_id={}] [max_applied_ver={}]", dir->getMaxId(), dir->sequence); - if (blob_stats) + if (!for_dump_snapshot && blob_stats) { // After all entries restored to `mvcc_table_directory`, only apply // the latest entry to `blob_stats`, or we may meet error since diff --git a/dbms/src/Storages/Page/V3/PageDirectoryFactory.h b/dbms/src/Storages/Page/V3/PageDirectoryFactory.h index a922db3b497..97a9d0da8d6 100644 --- a/dbms/src/Storages/Page/V3/PageDirectoryFactory.h +++ b/dbms/src/Storages/Page/V3/PageDirectoryFactory.h @@ -48,7 +48,7 @@ class PageDirectoryFactory PageDirectoryPtr create(String storage_name, FileProviderPtr & file_provider, PSDiskDelegatorPtr & delegator, WALStore::Config config); - PageDirectoryPtr createFromReader(String storage_name, WALStoreReaderPtr reader, WALStorePtr wal); + PageDirectoryPtr createFromReader(String storage_name, WALStoreReaderPtr reader, WALStorePtr wal, bool for_dump_snapshot = false); // just for test PageDirectoryPtr createFromEdit(String storage_name, FileProviderPtr & file_provider, PSDiskDelegatorPtr & delegator, const PageEntriesEdit & edit); diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp index ce2ba0adaf4..f69066b26e5 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -52,7 +53,7 @@ class PageStorageTest : public DB::base::TiFlashStorageTestBasic auto path = getTemporaryPath(); createIfNotExist(path); file_provider = DB::tests::TiFlashTestEnv::getContext().getFileProvider(); - auto delegator = std::make_shared(path); + delegator = std::make_shared(path); page_storage = std::make_shared("test.t", delegator, config, file_provider); page_storage->restore(); } @@ -60,7 +61,7 @@ class PageStorageTest : public DB::base::TiFlashStorageTestBasic std::shared_ptr reopenWithConfig(const PageStorage::Config & config_) { auto path = getTemporaryPath(); - auto delegator = std::make_shared(path); + delegator = std::make_shared(path); auto storage = std::make_shared("test.t", delegator, config_, file_provider); storage->restore(); return storage; @@ -70,6 +71,7 @@ class PageStorageTest : public DB::base::TiFlashStorageTestBasic protected: FileProviderPtr file_provider; std::unique_ptr path_pool; + PSDiskDelegatorPtr delegator; PageStorage::Config config; std::shared_ptr page_storage; @@ -1409,5 +1411,147 @@ try CATCH +TEST_F(PageStorageTest, DumpPageStorageSnapshot) +try +{ + { + PageStorage::Config config; + config.blob_heavy_gc_valid_rate = 1.0; /// always run full gc + config.wal_roll_size = 1 * 1024 * 1024; /// make the wal file more easy to roll + config.wal_max_persisted_log_files = 10; /// avoid checkpoint when gc + page_storage = reopenWithConfig(config); + } + + const size_t buf_sz = 1024; + char c_buff[buf_sz]; + + for (size_t i = 0; i < buf_sz; ++i) + { + c_buff[i] = i % 0xff; + } + + PageId page_id0 = 120; + { + WriteBatch batch; + batch.putPage(page_id0, 0, std::make_shared(c_buff, buf_sz), buf_sz, {}); + page_storage->write(std::move(batch)); + } + + // create a snapshot to avoid gc + auto snap = page_storage->getSnapshot(); + + { + WriteBatch batch; + batch.delPage(page_id0); + page_storage->write(std::move(batch)); + } + + auto getLogFileNum = [&]() { + auto log_files = WALStoreReader::listAllFiles(delegator, Logger::get("PageStorageTest", "")); + return log_files.size(); + }; + + // write until there are more than one wal file + while (getLogFileNum() <= 1) + { + WriteBatch batch; + PageId page_id1 = 130; + batch.putPage(page_id1, 0, std::make_shared(c_buff, buf_sz), buf_sz, {}); + page_storage->write(std::move(batch)); + } + ASSERT_ANY_THROW(page_storage->read(page_id0)); + + // write an upsert entry into the current writing log file + auto done_full_gc = page_storage->gc(); + EXPECT_TRUE(done_full_gc); + + auto done_snapshot = page_storage->page_directory->tryDumpSnapshot(nullptr, nullptr, /* force */ true); + ASSERT_TRUE(done_snapshot); + + { + PageStorage::Config config; + page_storage = reopenWithConfig(config); + } + + ASSERT_ANY_THROW(page_storage->read(page_id0)); +} +CATCH + +TEST_F(PageStorageTest, DumpPageStorageSnapshotWithRefPage) +try +{ + { + PageStorage::Config config; + config.blob_heavy_gc_valid_rate = 1.0; /// always run full gc + config.wal_roll_size = 1 * 1024 * 1024; /// make the wal file more easy to roll + config.wal_max_persisted_log_files = 10; /// avoid checkpoint when gc + page_storage = reopenWithConfig(config); + } + + const size_t buf_sz = 1024; + char c_buff[buf_sz]; + + for (size_t i = 0; i < buf_sz; ++i) + { + c_buff[i] = i % 0xff; + } + + PageId page_id0 = 120; + { + WriteBatch batch; + batch.putPage(page_id0, 0, std::make_shared(c_buff, buf_sz), buf_sz, {}); + page_storage->write(std::move(batch)); + } + PageId page_id1 = 121; + { + WriteBatch batch; + batch.putRefPage(page_id1, page_id0); + page_storage->write(std::move(batch)); + } + // create a snapshot to avoid gc + auto snap = page_storage->getSnapshot(); + + { + WriteBatch batch; + batch.delPage(page_id0); + page_storage->write(std::move(batch)); + } + { + WriteBatch batch; + batch.delPage(page_id1); + page_storage->write(std::move(batch)); + } + + auto getLogFileNum = [&]() { + auto log_files = WALStoreReader::listAllFiles(delegator, Logger::get("PageStorageTest", "")); + return log_files.size(); + }; + + // write until there are more than one wal file + while (getLogFileNum() <= 1) + { + WriteBatch batch; + PageId page_id2 = 130; + batch.putPage(page_id2, 0, std::make_shared(c_buff, buf_sz), buf_sz, {}); + page_storage->write(std::move(batch)); + } + ASSERT_ANY_THROW(page_storage->read(page_id0)); + + // write an upsert entry into the current writing log file + auto done_full_gc = page_storage->gc(); + EXPECT_TRUE(done_full_gc); + + auto done_snapshot = page_storage->page_directory->tryDumpSnapshot(nullptr, nullptr, /* force */ true); + ASSERT_TRUE(done_snapshot); + + { + PageStorage::Config config; + page_storage = reopenWithConfig(config); + } + + ASSERT_ANY_THROW(page_storage->read(page_id0)); +} +CATCH + } // namespace PS::V3::tests } // namespace DB diff --git a/dbms/src/Storages/Page/V3/tests/gtest_wal_log.cpp b/dbms/src/Storages/Page/V3/tests/gtest_wal_log.cpp index 0f1406f57fc..c4d3e2f7ab7 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_wal_log.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_wal_log.cpp @@ -12,15 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. -// This source code is licensed under both the GPLv2 (found in the -// COPYING file in the root directory) and Apache 2.0 License -// (found in the LICENSE.Apache file in the root directory). -// -// Copyright (c) 2011 The LevelDB Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. See the AUTHORS file for names of contributors. - #include #include #include From dd4f673e6d0bfb38183fab27c01b6c99a2b11197 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Thu, 14 Jul 2022 17:47:26 +0800 Subject: [PATCH 2/3] fix conflict --- dbms/src/Storages/Page/V3/BlobStore.cpp | 5 ---- dbms/src/Storages/Page/V3/PageDirectory.cpp | 28 ++----------------- dbms/src/Storages/Page/V3/PageDirectory.h | 17 ++--------- .../Storages/Page/V3/PageDirectoryFactory.cpp | 8 ++---- 4 files changed, 7 insertions(+), 51 deletions(-) diff --git a/dbms/src/Storages/Page/V3/BlobStore.cpp b/dbms/src/Storages/Page/V3/BlobStore.cpp index 3d7cbcebd04..bf9e56c4a58 100644 --- a/dbms/src/Storages/Page/V3/BlobStore.cpp +++ b/dbms/src/Storages/Page/V3/BlobStore.cpp @@ -1118,12 +1118,7 @@ PageEntriesEdit BlobStore::gc(std::map & new_entry.size = entry.size; new_entry.file_id = blobfile_id; -<<<<<<< HEAD - new_entry.offset = file_offset_beg + offset_in_data; -======= new_entry.offset = file_offset_begin + offset_in_data; - new_entry.padded_size = 0; // reset padded size to be zero ->>>>>>> c40c262576 (keep delete entry when dump snapshot (#5357)) offset_in_data += new_entry.size; data_pos += new_entry.size; diff --git a/dbms/src/Storages/Page/V3/PageDirectory.cpp b/dbms/src/Storages/Page/V3/PageDirectory.cpp index fa57f678c7c..500e7f4c1d6 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectory.cpp @@ -478,14 +478,9 @@ PageSize VersionedPageEntries::getEntriesByBlobIds( bool VersionedPageEntries::cleanOutdatedEntries( UInt64 lowest_seq, std::map> * normal_entries_to_deref, -<<<<<<< HEAD PageEntriesV3 & entries_removed, - const PageLock & /*page_lock*/) -======= - PageEntriesV3 * entries_removed, const PageLock & /*page_lock*/, bool keep_last_valid_var_entry) ->>>>>>> c40c262576 (keep delete entry when dump snapshot (#5357)) { if (type == EditRecordType::VAR_EXTERNAL) { @@ -576,11 +571,7 @@ bool VersionedPageEntries::cleanOutdatedEntries( return entries.empty() || (entries.size() == 1 && entries.begin()->second.isDelete()); } -<<<<<<< HEAD -bool VersionedPageEntries::derefAndClean(UInt64 lowest_seq, PageIdV3Internal page_id, const PageVersion & deref_ver, const Int64 deref_count, PageEntriesV3 & entries_removed) -======= -bool VersionedPageEntries::derefAndClean(UInt64 lowest_seq, PageIdV3Internal page_id, const PageVersion & deref_ver, const Int64 deref_count, PageEntriesV3 * entries_removed, bool keep_last_valid_var_entry) ->>>>>>> c40c262576 (keep delete entry when dump snapshot (#5357)) +bool VersionedPageEntries::derefAndClean(UInt64 lowest_seq, PageIdV3Internal page_id, const PageVersion & deref_ver, const Int64 deref_count, PageEntriesV3 & entries_removed, bool keep_last_valid_var_entry) { auto page_lock = acquireLock(); if (type == EditRecordType::VAR_EXTERNAL) @@ -1263,11 +1254,7 @@ bool PageDirectory::tryDumpSnapshot(const ReadLimiterPtr & read_limiter, const W return done_any_io; } -<<<<<<< HEAD -PageEntriesV3 PageDirectory::gcInMemEntries() -======= -PageEntriesV3 PageDirectory::gcInMemEntries(bool return_removed_entries, bool keep_last_valid_var_entry) ->>>>>>> c40c262576 (keep delete entry when dump snapshot (#5357)) +PageEntriesV3 PageDirectory::gcInMemEntries(bool keep_last_valid_var_entry) { UInt64 lowest_seq = sequence.load(); @@ -1331,14 +1318,9 @@ PageEntriesV3 PageDirectory::gcInMemEntries(bool return_removed_entries, bool ke const bool all_deleted = iter->second->cleanOutdatedEntries( lowest_seq, &normal_entries_to_deref, -<<<<<<< HEAD all_del_entries, - iter->second->acquireLock()); -======= - return_removed_entries ? &all_del_entries : nullptr, iter->second->acquireLock(), keep_last_valid_var_entry); ->>>>>>> c40c262576 (keep delete entry when dump snapshot (#5357)) { std::unique_lock write_lock(table_rw_mutex); @@ -1376,12 +1358,8 @@ PageEntriesV3 PageDirectory::gcInMemEntries(bool return_removed_entries, bool ke page_id, /*deref_ver=*/deref_counter.first, /*deref_count=*/deref_counter.second, -<<<<<<< HEAD - all_del_entries); -======= - return_removed_entries ? &all_del_entries : nullptr, + all_del_entries, keep_last_valid_var_entry); ->>>>>>> c40c262576 (keep delete entry when dump snapshot (#5357)) if (all_deleted) { diff --git a/dbms/src/Storages/Page/V3/PageDirectory.h b/dbms/src/Storages/Page/V3/PageDirectory.h index b09af837f1f..95bd5803ee4 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.h +++ b/dbms/src/Storages/Page/V3/PageDirectory.h @@ -223,25 +223,16 @@ class VersionedPageEntries bool cleanOutdatedEntries( UInt64 lowest_seq, std::map> * normal_entries_to_deref, -<<<<<<< HEAD PageEntriesV3 & entries_removed, - const PageLock & page_lock); -======= - PageEntriesV3 * entries_removed, const PageLock & page_lock, bool keep_last_valid_var_entry = false); ->>>>>>> c40c262576 (keep delete entry when dump snapshot (#5357)) bool derefAndClean( UInt64 lowest_seq, PageIdV3Internal page_id, const PageVersion & deref_ver, Int64 deref_count, -<<<<<<< HEAD - PageEntriesV3 & entries_removed); -======= - PageEntriesV3 * entries_removed, + PageEntriesV3 & entries_removed, bool keep_last_valid_var_entry = false); ->>>>>>> c40c262576 (keep delete entry when dump snapshot (#5357)) void collapseTo(UInt64 seq, PageIdV3Internal page_id, PageEntriesEdit & edit); @@ -374,14 +365,10 @@ class PageDirectory /// And we don't restore the entries in blob store, because this PageDirectory is just read only for its entries. bool tryDumpSnapshot(const ReadLimiterPtr & read_limiter = nullptr, const WriteLimiterPtr & write_limiter = nullptr, bool force = false); -<<<<<<< HEAD - PageEntriesV3 gcInMemEntries(); -======= // Perform a GC for in-memory entries and return the removed entries. // If `return_removed_entries` is false, then just return an empty set. // When dump snapshot, we need to keep the last valid entry. Check out `tryDumpSnapshot` for the reason. - PageEntriesV3 gcInMemEntries(bool return_removed_entries = true, bool keep_last_valid_var_entry = false); ->>>>>>> c40c262576 (keep delete entry when dump snapshot (#5357)) + PageEntriesV3 gcInMemEntries(bool keep_last_valid_var_entry = false); std::set getAliveExternalIds(NamespaceId ns_id) const; diff --git a/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp b/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp index 29dbc03745f..3d1cc8d0491 100644 --- a/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp @@ -44,12 +44,8 @@ PageDirectoryPtr PageDirectoryFactory::createFromReader(String storage_name, WAL // After restoring from the disk, we need cleanup all invalid entries in memory, or it will // try to run GC again on some entries that are already marked as invalid in BlobStore. -<<<<<<< HEAD - dir->gcInMemEntries(); -======= - // It's no need to remove the expired entries in BlobStore, so skip filling removed_entries to improve performance. - dir->gcInMemEntries(/*return_removed_entries=*/false, /* keep_last_delete_entry */ for_dump_snapshot); ->>>>>>> c40c262576 (keep delete entry when dump snapshot (#5357)) + dir->gcInMemEntries(/* keep_last_delete_entry */ for_dump_snapshot); + LOG_FMT_INFO(DB::Logger::get("PageDirectoryFactory", storage_name), "PageDirectory restored [max_page_id={}] [max_applied_ver={}]", dir->getMaxId(), dir->sequence); if (!for_dump_snapshot && blob_stats) From e59f46648ba78f21d760c39cf5b23cdc2b6fc171 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Wed, 17 Aug 2022 19:37:05 +0800 Subject: [PATCH 3/3] format files Signed-off-by: JaySon-Huang --- dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp index ab29fecaddd..456238a4cc5 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp @@ -1550,7 +1550,7 @@ try } ASSERT_ANY_THROW(page_storage->read(page_id0)); - } +} CATCH TEST_F(PageStorageTest, ReloadConfig)