Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 34 additions & 27 deletions dbms/src/Storages/Page/V3/BlobStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1023,26 +1023,10 @@ PageEntriesEdit BlobStore::gc(std::map<BlobFileId, PageIdAndVersionedEntries> &
}
LOG_FMT_INFO(log, "BlobStore gc will migrate {:.2f}MB into new Blobs", (1.0 * total_page_size / DB::MB));

const auto config_file_limit = config.file_limit_size.get();
auto alloc_size = total_page_size > config_file_limit ? config_file_limit : total_page_size;
BlobFileOffset remaining_page_size = total_page_size - alloc_size;

// We could make the memory consumption smooth during GC.
char * data_buf = static_cast<char *>(alloc(alloc_size));
SCOPE_EXIT({
free(data_buf, alloc_size);
});

char * data_pos = data_buf;
BlobFileOffset offset_in_data = 0;
BlobFileId blobfile_id;
BlobFileOffset file_offset_beg;
std::tie(blobfile_id, file_offset_beg) = getPosFromStats(alloc_size);

auto write_blob = [this, total_page_size, &written_blobs, &write_limiter](const BlobFileId & file_id,
char * data_beg,
char * data_begin,
const BlobFileOffset & file_offset,
const BlobFileOffset & data_size) {
const PageSize & data_size) {
try
{
auto blob_file = getBlobFile(file_id);
Expand All @@ -1056,7 +1040,7 @@ PageEntriesEdit BlobStore::gc(std::map<BlobFileId, PageIdAndVersionedEntries> &
file_offset,
data_size,
total_page_size);
blob_file->write(data_beg, file_offset, data_size, write_limiter, /*background*/ true);
blob_file->write(data_begin, file_offset, data_size, write_limiter, /*background*/ true);
}
catch (DB::Exception & e)
{
Expand All @@ -1075,6 +1059,23 @@ PageEntriesEdit BlobStore::gc(std::map<BlobFileId, PageIdAndVersionedEntries> &
}
};

const auto config_file_limit = config.file_limit_size.get();
// If `total_page_size` is greater than `config_file_limit`, we need to write the page data into multiple `BlobFile`s to
// make the memory consumption smooth during GC.
auto alloc_size = total_page_size > config_file_limit ? config_file_limit : total_page_size;
BlobFileOffset remaining_page_size = total_page_size - alloc_size;

char * data_buf = static_cast<char *>(alloc(alloc_size));
SCOPE_EXIT({
free(data_buf, alloc_size);
});

char * data_pos = data_buf;
BlobFileOffset offset_in_data = 0;
BlobFileId blobfile_id;
BlobFileOffset file_offset_begin;
std::tie(blobfile_id, file_offset_begin) = getPosFromStats(alloc_size);

// blob_file_0, [<page_id_0, ver0, entry0>,
// <page_id_0, ver1, entry1>,
// <page_id_1, ver1, entry1>, ... ]
Expand All @@ -1084,11 +1085,16 @@ PageEntriesEdit BlobStore::gc(std::map<BlobFileId, PageIdAndVersionedEntries> &
{
for (const auto & [page_id, versioned, entry] : versioned_pageid_entry_list)
{
// When we can't load the remaining data.
// we will use the original buffer to find an area to load the remaining data
if (offset_in_data + entry.size > config_file_limit)
/// If `total_page_size` is greater than `config_file_limit`, we need to write the page data into multiple `BlobFile`s.
/// So there may be some page entry that cannot be fit into the current blob file, and we need to write it into the next one.
/// And we need perform the following steps before writing data into the current blob file:
/// 1. reclaim unneeded space allocated from current blob stat if `offset_in_data` < `alloc_size`;
/// 2. update `remaining_page_size`;
/// After writing data into the current blob file, we reuse the original buffer for future write.
if (offset_in_data + entry.size > alloc_size)
{
assert(file_offset_beg == 0);
assert(alloc_size == config_file_limit);
assert(file_offset_begin == 0);
// Remove the span that is not actually used
if (offset_in_data != alloc_size)
{
Expand All @@ -1097,7 +1103,7 @@ PageEntriesEdit BlobStore::gc(std::map<BlobFileId, PageIdAndVersionedEntries> &
remaining_page_size += alloc_size - offset_in_data;

// Write data into Blob.
write_blob(blobfile_id, data_buf, file_offset_beg, offset_in_data);
write_blob(blobfile_id, data_buf, file_offset_begin, offset_in_data);

// Reset the position to reuse the buffer allocated
data_pos = data_buf;
Expand All @@ -1106,7 +1112,7 @@ PageEntriesEdit BlobStore::gc(std::map<BlobFileId, PageIdAndVersionedEntries> &
// Acquire a span from stats for remaining data
auto next_alloc_size = (remaining_page_size > config_file_limit ? config_file_limit : remaining_page_size);
remaining_page_size -= next_alloc_size;
std::tie(blobfile_id, file_offset_beg) = getPosFromStats(next_alloc_size);
std::tie(blobfile_id, file_offset_begin) = getPosFromStats(next_alloc_size);
}

PageEntryV3 new_entry;
Expand All @@ -1123,7 +1129,7 @@ PageEntriesEdit BlobStore::gc(std::map<BlobFileId, PageIdAndVersionedEntries> &
new_entry.size = entry.size;

new_entry.file_id = blobfile_id;
new_entry.offset = file_offset_beg + offset_in_data;
new_entry.offset = file_offset_begin + offset_in_data;

offset_in_data += new_entry.size;
data_pos += new_entry.size;
Expand All @@ -1132,9 +1138,10 @@ PageEntriesEdit BlobStore::gc(std::map<BlobFileId, PageIdAndVersionedEntries> &
}
}

// write remaining data in `data_buf` into BlobFile
if (offset_in_data != 0)
{
write_blob(blobfile_id, data_buf, file_offset_beg, offset_in_data);
write_blob(blobfile_id, data_buf, file_offset_begin, offset_in_data);
}

return edit;
Expand Down
12 changes: 6 additions & 6 deletions dbms/src/Storages/Page/V3/LogFile/LogFilename.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,21 @@ struct LogFilename

static LogFilename parseFrom(const String & parent_path, const String & filename, LoggerPtr log);

inline String filename(LogFileStage stage) const
inline String filename(LogFileStage file_stage) const
{
assert(stage != LogFileStage::Invalid);
assert(file_stage != LogFileStage::Invalid);
return fmt::format(
"{}_{}_{}",
((stage == LogFileStage::Temporary) ? LOG_FILE_PREFIX_TEMP : LOG_FILE_PREFIX_NORMAL),
((file_stage == LogFileStage::Temporary) ? LOG_FILE_PREFIX_TEMP : LOG_FILE_PREFIX_NORMAL),
log_num,
level_num);
}

inline String fullname(LogFileStage stage) const
inline String fullname(LogFileStage file_stage) const
{
assert(stage != LogFileStage::Invalid);
assert(file_stage != LogFileStage::Invalid);
assert(!parent_path.empty());
return fmt::format("{}/{}", parent_path, filename(stage));
return fmt::format("{}/{}", parent_path, filename(file_stage));
}
};

Expand Down
67 changes: 42 additions & 25 deletions dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ void VersionedPageEntries::createNewEntry(const PageVersion & ver, const PageEnt
ErrorCodes::LOGICAL_ERROR);
}
// create a new version that inherit the `being_ref_count` of the last entry
entries.emplace(ver, EntryOrDelete::newRepalcingEntry(last_iter->second, entry));
entries.emplace(ver, EntryOrDelete::newReplacingEntry(last_iter->second, entry));
}
return;
}
Expand Down Expand Up @@ -152,7 +152,7 @@ std::shared_ptr<PageIdV3Internal> VersionedPageEntries::createNewExternal(const
}
else
{
// apply a external with smaller ver than delete_ver, just ignore
// apply an external with smaller ver than delete_ver, just ignore
return nullptr;
}
}
Expand Down Expand Up @@ -479,7 +479,8 @@ bool VersionedPageEntries::cleanOutdatedEntries(
UInt64 lowest_seq,
std::map<PageIdV3Internal, std::pair<PageVersion, Int64>> * normal_entries_to_deref,
PageEntriesV3 & entries_removed,
const PageLock & /*page_lock*/)
const PageLock & /*page_lock*/,
bool keep_last_valid_var_entry)
{
if (type == EditRecordType::VAR_EXTERNAL)
{
Expand Down Expand Up @@ -525,8 +526,14 @@ bool VersionedPageEntries::cleanOutdatedEntries(
}

// If the first version less than <lowest_seq+1, 0> is entry / external,
// then we can remove those entries prev of it
bool keep_if_being_ref = !iter->second.isEntry();
// then we can remove those entries prev of it.
// If the first version less than <lowest_seq+1, 0> is delete,
// we may keep the first valid entry before the delete entry in the following case:
// 1) if `keep_last_valid_var_entry` is true
// (this is only used when dump snapshot because there may be some upsert entry in later wal files,
// so we need keep the last valid entry here to avoid the delete entry being removed)
// 2) if `being_ref_count` > 1(this means the entry is ref by other entries)
bool last_entry_is_delete = !iter->second.isEntry();
--iter; // keep the first version less than <lowest_seq+1, 0>
while (true)
{
Expand All @@ -537,16 +544,16 @@ bool VersionedPageEntries::cleanOutdatedEntries(
}
else if (iter->second.isEntry())
{
if (keep_if_being_ref)
if (last_entry_is_delete)
{
if (iter->second.being_ref_count == 1)
if (!keep_last_valid_var_entry && iter->second.being_ref_count == 1)
{
entries_removed.emplace_back(iter->second.entry);
iter = entries.erase(iter);
}
// The `being_ref_count` for this version is valid. While for older versions,
// theirs `being_ref_count` is invalid, don't need to be kept
keep_if_being_ref = false;
last_entry_is_delete = false;
}
else
{
Expand All @@ -564,7 +571,7 @@ bool VersionedPageEntries::cleanOutdatedEntries(
return entries.empty() || (entries.size() == 1 && entries.begin()->second.isDelete());
}

bool VersionedPageEntries::derefAndClean(UInt64 lowest_seq, PageIdV3Internal page_id, const PageVersion & deref_ver, const Int64 deref_count, PageEntriesV3 & entries_removed)
bool VersionedPageEntries::derefAndClean(UInt64 lowest_seq, PageIdV3Internal page_id, const PageVersion & deref_ver, const Int64 deref_count, PageEntriesV3 & entries_removed, bool keep_last_valid_var_entry)
{
auto page_lock = acquireLock();
if (type == EditRecordType::VAR_EXTERNAL)
Expand Down Expand Up @@ -601,7 +608,7 @@ bool VersionedPageEntries::derefAndClean(UInt64 lowest_seq, PageIdV3Internal pag

// Clean outdated entries after decreased the ref-counter
// set `normal_entries_to_deref` to be nullptr to ignore cleaning ref-var-entries
return cleanOutdatedEntries(lowest_seq, /*normal_entries_to_deref*/ nullptr, entries_removed, page_lock);
return cleanOutdatedEntries(lowest_seq, /*normal_entries_to_deref*/ nullptr, entries_removed, page_lock, keep_last_valid_var_entry);
}

throw Exception(fmt::format("calling derefAndClean with invalid state [state={}]", toDebugString()));
Expand Down Expand Up @@ -659,14 +666,22 @@ void VersionedPageEntries::collapseTo(const UInt64 seq, const PageIdV3Internal p
}
auto last_version = last_iter->first;
auto prev_iter = --last_iter; // Note that `last_iter` should not be used anymore
if (prev_iter->second.isEntry())
while (true)
{
if (prev_iter->second.being_ref_count == 1)
return;
// It is being ref by another id, should persist the item and delete
const auto & entry = prev_iter->second;
edit.varEntry(page_id, prev_iter->first, entry.entry, entry.being_ref_count);
edit.varDel(page_id, last_version);
// if there is any entry prev to this delete entry,
// 1) the entry may be ref by another id.
// 2) the entry may be upsert into a newer wal file by the gc process.
// So we need to keep the entry item and its delete entry in the snapshot.
if (prev_iter->second.isEntry())
{
const auto & entry = prev_iter->second;
edit.varEntry(page_id, prev_iter->first, entry.entry, entry.being_ref_count);
edit.varDel(page_id, last_version);
break;
}
if (prev_iter == entries.begin())
break;
prev_iter--;
}
}
return;
Expand All @@ -691,7 +706,6 @@ PageDirectory::PageDirectory(String storage_name, WALStorePtr && wal_, UInt64 ma
, wal(std::move(wal_))
, max_persisted_log_files(max_persisted_log_files_)
, log(Logger::get("PageDirectory", std::move(storage_name)))

{
}

Expand Down Expand Up @@ -969,7 +983,7 @@ void PageDirectory::applyRefEditRecord(
const PageVersion & version)
{
// applying ref 3->2, existing ref 2->1, normal entry 1, then we should collapse
// the ref to be 3->1, increase the refcounting of normale entry 1
// the ref to be 3->1, increase the refcounting of normal entry 1
auto [resolve_success, resolved_id, resolved_ver] = [&mvcc_table_directory](PageIdV3Internal id_to_resolve, PageVersion ver_to_resolve)
-> std::tuple<bool, PageIdV3Internal, PageVersion> {
while (true)
Expand Down Expand Up @@ -1210,12 +1224,12 @@ PageDirectory::getEntriesByBlobIds(const std::vector<BlobFileId> & blob_ids) con
return std::make_pair(std::move(blob_versioned_entries), total_page_size);
}

bool PageDirectory::tryDumpSnapshot(const ReadLimiterPtr & read_limiter, const WriteLimiterPtr & write_limiter)
bool PageDirectory::tryDumpSnapshot(const ReadLimiterPtr & read_limiter, const WriteLimiterPtr & write_limiter, bool force)
{
bool done_any_io = false;
// In order not to make read amplification too high, only apply compact logs when ...
auto files_snap = wal->getFilesSnapshot();
if (files_snap.needSave(max_persisted_log_files))
if (files_snap.needSave(max_persisted_log_files) || (force && (!files_snap.persisted_log_files.empty())))
{
// To prevent writes from affecting dumping snapshot (and vice versa), old log files
// are read from disk and a temporary PageDirectory is generated for dumping snapshot.
Expand All @@ -1231,15 +1245,16 @@ bool PageDirectory::tryDumpSnapshot(const ReadLimiterPtr & read_limiter, const W
PageDirectoryPtr collapsed_dir = factory.createFromReader(
identifier,
std::move(snapshot_reader),
/*wal=*/nullptr);
/* wal */ nullptr,
/* for_dump_snapshot */ true);
// The records persisted in `files_snap` is older than or equal to all records in `edit`
auto edit_from_disk = collapsed_dir->dumpSnapshotToEdit();
done_any_io = wal->saveSnapshot(std::move(files_snap), std::move(edit_from_disk), write_limiter);
}
return done_any_io;
}

PageEntriesV3 PageDirectory::gcInMemEntries()
PageEntriesV3 PageDirectory::gcInMemEntries(bool keep_last_valid_var_entry)
{
UInt64 lowest_seq = sequence.load();

Expand Down Expand Up @@ -1304,7 +1319,8 @@ PageEntriesV3 PageDirectory::gcInMemEntries()
lowest_seq,
&normal_entries_to_deref,
all_del_entries,
iter->second->acquireLock());
iter->second->acquireLock(),
keep_last_valid_var_entry);

{
std::unique_lock write_lock(table_rw_mutex);
Expand Down Expand Up @@ -1342,7 +1358,8 @@ PageEntriesV3 PageDirectory::gcInMemEntries()
page_id,
/*deref_ver=*/deref_counter.first,
/*deref_count=*/deref_counter.second,
all_del_entries);
all_del_entries,
keep_last_valid_var_entry);

if (all_deleted)
{
Expand Down
18 changes: 13 additions & 5 deletions dbms/src/Storages/Page/V3/PageDirectory.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ struct EntryOrDelete
.entry = entry,
};
}
static EntryOrDelete newRepalcingEntry(const EntryOrDelete & ori_entry, const PageEntryV3 & entry)
static EntryOrDelete newReplacingEntry(const EntryOrDelete & ori_entry, const PageEntryV3 & entry)
{
return EntryOrDelete{
.is_delete = false,
Expand Down Expand Up @@ -224,13 +224,15 @@ class VersionedPageEntries
UInt64 lowest_seq,
std::map<PageIdV3Internal, std::pair<PageVersion, Int64>> * normal_entries_to_deref,
PageEntriesV3 & entries_removed,
const PageLock & page_lock);
const PageLock & page_lock,
bool keep_last_valid_var_entry = false);
bool derefAndClean(
UInt64 lowest_seq,
PageIdV3Internal page_id,
const PageVersion & deref_ver,
Int64 deref_count,
PageEntriesV3 & entries_removed);
PageEntriesV3 & entries_removed,
bool keep_last_valid_var_entry = false);

void collapseTo(UInt64 seq, PageIdV3Internal page_id, PageEntriesEdit & edit);

Expand Down Expand Up @@ -358,9 +360,15 @@ class PageDirectory

void gcApply(PageEntriesEdit && migrated_edit, const WriteLimiterPtr & write_limiter = nullptr);

bool tryDumpSnapshot(const ReadLimiterPtr & read_limiter = nullptr, const WriteLimiterPtr & write_limiter = nullptr);
/// When create PageDirectory for dump snapshot, we should keep the last valid var_entry when it is deleted.
/// Because there may be some upsert entry in later wal files, and we should keep the valid var_entry and the delete entry to delete the later upsert entry.
/// And we don't restore the entries in blob store, because this PageDirectory is just read only for its entries.
bool tryDumpSnapshot(const ReadLimiterPtr & read_limiter = nullptr, const WriteLimiterPtr & write_limiter = nullptr, bool force = false);

PageEntriesV3 gcInMemEntries();
// Perform a GC for in-memory entries and return the removed entries.
// If `return_removed_entries` is false, then just return an empty set.
// When dump snapshot, we need to keep the last valid entry. Check out `tryDumpSnapshot` for the reason.
PageEntriesV3 gcInMemEntries(bool keep_last_valid_var_entry = false);

std::set<PageId> getAliveExternalIds(NamespaceId ns_id) const;

Expand Down
Loading