Skip to content
61 changes: 34 additions & 27 deletions dbms/src/Storages/Page/V3/BlobStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1068,26 +1068,10 @@ PageEntriesEdit BlobStore::gc(std::map<BlobFileId, PageIdAndVersionedEntries> &
}
LOG_FMT_INFO(log, "BlobStore gc will migrate {:.2f}MB into new Blobs", (1.0 * total_page_size / DB::MB));

const auto config_file_limit = config.file_limit_size.get();
Comment thread
lidezhu marked this conversation as resolved.
auto alloc_size = total_page_size > config_file_limit ? config_file_limit : total_page_size;
BlobFileOffset remaining_page_size = total_page_size - alloc_size;

// We could make the memory consumption smooth during GC.
char * data_buf = static_cast<char *>(alloc(alloc_size));
SCOPE_EXIT({
free(data_buf, alloc_size);
});

char * data_pos = data_buf;
BlobFileOffset offset_in_data = 0;
BlobFileId blobfile_id;
BlobFileOffset file_offset_beg;
std::tie(blobfile_id, file_offset_beg) = getPosFromStats(alloc_size);

auto write_blob = [this, total_page_size, &written_blobs, &write_limiter](const BlobFileId & file_id,
char * data_beg,
char * data_begin,
const BlobFileOffset & file_offset,
const BlobFileOffset & data_size) {
const PageSize & data_size) {
try
{
auto blob_file = getBlobFile(file_id);
Expand All @@ -1101,7 +1085,7 @@ PageEntriesEdit BlobStore::gc(std::map<BlobFileId, PageIdAndVersionedEntries> &
file_offset,
data_size,
total_page_size);
blob_file->write(data_beg, file_offset, data_size, write_limiter, /*background*/ true);
Comment thread
lidezhu marked this conversation as resolved.
blob_file->write(data_begin, file_offset, data_size, write_limiter, /*background*/ true);
}
catch (DB::Exception & e)
{
Expand All @@ -1120,6 +1104,23 @@ PageEntriesEdit BlobStore::gc(std::map<BlobFileId, PageIdAndVersionedEntries> &
}
};

const auto config_file_limit = config.file_limit_size.get();
// If `total_page_size` is greater than `config_file_limit`, we need to write the page data into multiple `BlobFile`s to
// make the memory consumption smooth during GC.
auto alloc_size = total_page_size > config_file_limit ? config_file_limit : total_page_size;
BlobFileOffset remaining_page_size = total_page_size - alloc_size;

char * data_buf = static_cast<char *>(alloc(alloc_size));
SCOPE_EXIT({
free(data_buf, alloc_size);
});

char * data_pos = data_buf;
BlobFileOffset offset_in_data = 0;
BlobFileId blobfile_id;
BlobFileOffset file_offset_begin;
std::tie(blobfile_id, file_offset_begin) = getPosFromStats(alloc_size);

// blob_file_0, [<page_id_0, ver0, entry0>,
// <page_id_0, ver1, entry1>,
// <page_id_1, ver1, entry1>, ... ]
Expand All @@ -1129,11 +1130,16 @@ PageEntriesEdit BlobStore::gc(std::map<BlobFileId, PageIdAndVersionedEntries> &
{
for (const auto & [page_id, versioned, entry] : versioned_pageid_entry_list)
{
// When we can't load the remaining data.
// we will use the original buffer to find an area to load the remaining data
if (offset_in_data + entry.size > config_file_limit)
/// If `total_page_size` is greater than `config_file_limit`, we need to write the page data into multiple `BlobFile`s.
/// So there may be some page entry that cannot be fit into the current blob file, and we need to write it into the next one.
/// And we need perform the following steps before writing data into the current blob file:
/// 1. reclaim unneeded space allocated from current blob stat if `offset_in_data` < `alloc_size`;
/// 2. update `remaining_page_size`;
/// After writing data into the current blob file, we reuse the original buffer for future write.
if (offset_in_data + entry.size > alloc_size)
{
assert(file_offset_beg == 0);
assert(alloc_size == config_file_limit);
assert(file_offset_begin == 0);
// Remove the span that is not actually used
if (offset_in_data != alloc_size)
{
Expand All @@ -1142,7 +1148,7 @@ PageEntriesEdit BlobStore::gc(std::map<BlobFileId, PageIdAndVersionedEntries> &
remaining_page_size += alloc_size - offset_in_data;

// Write data into Blob.
write_blob(blobfile_id, data_buf, file_offset_beg, offset_in_data);
write_blob(blobfile_id, data_buf, file_offset_begin, offset_in_data);

// Reset the position to reuse the buffer allocated
data_pos = data_buf;
Expand All @@ -1151,7 +1157,7 @@ PageEntriesEdit BlobStore::gc(std::map<BlobFileId, PageIdAndVersionedEntries> &
// Acquire a span from stats for remaining data
auto next_alloc_size = (remaining_page_size > config_file_limit ? config_file_limit : remaining_page_size);
remaining_page_size -= next_alloc_size;
std::tie(blobfile_id, file_offset_beg) = getPosFromStats(next_alloc_size);
std::tie(blobfile_id, file_offset_begin) = getPosFromStats(next_alloc_size);
}

// Read the data into buffer by old entry
Expand All @@ -1161,7 +1167,7 @@ PageEntriesEdit BlobStore::gc(std::map<BlobFileId, PageIdAndVersionedEntries> &
// need to be updated.
PageEntryV3 new_entry = entry;
new_entry.file_id = blobfile_id;
new_entry.offset = file_offset_beg + offset_in_data;
new_entry.offset = file_offset_begin + offset_in_data;
new_entry.padded_size = 0; // reset padded size to be zero

offset_in_data += new_entry.size;
Expand All @@ -1171,9 +1177,10 @@ PageEntriesEdit BlobStore::gc(std::map<BlobFileId, PageIdAndVersionedEntries> &
}
}

// write remaining data in `data_buf` into BlobFile
if (offset_in_data != 0)
{
write_blob(blobfile_id, data_buf, file_offset_beg, offset_in_data);
write_blob(blobfile_id, data_buf, file_offset_begin, offset_in_data);
}

return edit;
Expand Down
12 changes: 6 additions & 6 deletions dbms/src/Storages/Page/V3/LogFile/LogFilename.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,21 @@ struct LogFilename

static LogFilename parseFrom(const String & parent_path, const String & filename, LoggerPtr log);

inline String filename(LogFileStage stage) const
inline String filename(LogFileStage file_stage) const
{
assert(stage != LogFileStage::Invalid);
assert(file_stage != LogFileStage::Invalid);
return fmt::format(
"{}_{}_{}",
((stage == LogFileStage::Temporary) ? LOG_FILE_PREFIX_TEMP : LOG_FILE_PREFIX_NORMAL),
((file_stage == LogFileStage::Temporary) ? LOG_FILE_PREFIX_TEMP : LOG_FILE_PREFIX_NORMAL),
log_num,
level_num);
}

inline String fullname(LogFileStage stage) const
inline String fullname(LogFileStage file_stage) const
{
assert(stage != LogFileStage::Invalid);
assert(file_stage != LogFileStage::Invalid);
assert(!parent_path.empty());
return fmt::format("{}/{}", parent_path, filename(stage));
return fmt::format("{}/{}", parent_path, filename(file_stage));
}
};

Expand Down
67 changes: 42 additions & 25 deletions dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ void VersionedPageEntries::createNewEntry(const PageVersion & ver, const PageEnt
ErrorCodes::LOGICAL_ERROR);
}
// create a new version that inherit the `being_ref_count` of the last entry
entries.emplace(ver, EntryOrDelete::newRepalcingEntry(last_iter->second, entry));
entries.emplace(ver, EntryOrDelete::newReplacingEntry(last_iter->second, entry));
}
return;
}
Expand Down Expand Up @@ -152,7 +152,7 @@ std::shared_ptr<PageIdV3Internal> VersionedPageEntries::createNewExternal(const
}
else
{
// apply a external with smaller ver than delete_ver, just ignore
// apply an external with smaller ver than delete_ver, just ignore
return nullptr;
}
}
Expand Down Expand Up @@ -479,7 +479,8 @@ bool VersionedPageEntries::cleanOutdatedEntries(
UInt64 lowest_seq,
std::map<PageIdV3Internal, std::pair<PageVersion, Int64>> * normal_entries_to_deref,
PageEntriesV3 * entries_removed,
const PageLock & /*page_lock*/)
const PageLock & /*page_lock*/,
bool keep_last_valid_var_entry)
{
if (type == EditRecordType::VAR_EXTERNAL)
{
Expand Down Expand Up @@ -525,8 +526,14 @@ bool VersionedPageEntries::cleanOutdatedEntries(
}

// If the first version less than <lowest_seq+1, 0> is entry / external,
// then we can remove those entries prev of it
bool keep_if_being_ref = !iter->second.isEntry();
// then we can remove those entries prev of it.
// If the first version less than <lowest_seq+1, 0> is delete,
// we may keep the first valid entry before the delete entry in the following case:
// 1) if `keep_last_valid_var_entry` is true
// (this is only used when dump snapshot because there may be some upsert entry in later wal files,
// so we need keep the last valid entry here to avoid the delete entry being removed)
// 2) if `being_ref_count` > 1(this means the entry is ref by other entries)
bool last_entry_is_delete = !iter->second.isEntry();
--iter; // keep the first version less than <lowest_seq+1, 0>
while (true)
{
Expand All @@ -537,9 +544,9 @@ bool VersionedPageEntries::cleanOutdatedEntries(
}
else if (iter->second.isEntry())
{
if (keep_if_being_ref)
if (last_entry_is_delete)
{
if (iter->second.being_ref_count == 1)
if (!keep_last_valid_var_entry && iter->second.being_ref_count == 1)
{
if (entries_removed)
{
Expand All @@ -549,7 +556,7 @@ bool VersionedPageEntries::cleanOutdatedEntries(
}
// The `being_ref_count` for this version is valid. While for older versions,
// theirs `being_ref_count` is invalid, don't need to be kept
keep_if_being_ref = false;
last_entry_is_delete = false;
}
else
{
Expand All @@ -570,7 +577,7 @@ bool VersionedPageEntries::cleanOutdatedEntries(
return entries.empty() || (entries.size() == 1 && entries.begin()->second.isDelete());
}

bool VersionedPageEntries::derefAndClean(UInt64 lowest_seq, PageIdV3Internal page_id, const PageVersion & deref_ver, const Int64 deref_count, PageEntriesV3 * entries_removed)
bool VersionedPageEntries::derefAndClean(UInt64 lowest_seq, PageIdV3Internal page_id, const PageVersion & deref_ver, const Int64 deref_count, PageEntriesV3 * entries_removed, bool keep_last_valid_var_entry)
{
auto page_lock = acquireLock();
if (type == EditRecordType::VAR_EXTERNAL)
Expand Down Expand Up @@ -607,7 +614,7 @@ bool VersionedPageEntries::derefAndClean(UInt64 lowest_seq, PageIdV3Internal pag

// Clean outdated entries after decreased the ref-counter
// set `normal_entries_to_deref` to be nullptr to ignore cleaning ref-var-entries
return cleanOutdatedEntries(lowest_seq, /*normal_entries_to_deref*/ nullptr, entries_removed, page_lock);
return cleanOutdatedEntries(lowest_seq, /*normal_entries_to_deref*/ nullptr, entries_removed, page_lock, keep_last_valid_var_entry);
}

throw Exception(fmt::format("calling derefAndClean with invalid state [state={}]", toDebugString()));
Expand Down Expand Up @@ -665,14 +672,22 @@ void VersionedPageEntries::collapseTo(const UInt64 seq, const PageIdV3Internal p
}
auto last_version = last_iter->first;
auto prev_iter = --last_iter; // Note that `last_iter` should not be used anymore
if (prev_iter->second.isEntry())
while (true)
{
if (prev_iter->second.being_ref_count == 1)
return;
// It is being ref by another id, should persist the item and delete
const auto & entry = prev_iter->second;
edit.varEntry(page_id, prev_iter->first, entry.entry, entry.being_ref_count);
edit.varDel(page_id, last_version);
// if there is any entry prev to this delete entry,
// 1) the entry may be ref by another id.
// 2) the entry may be upsert into a newer wal file by the gc process.
// So we need to keep the entry item and its delete entry in the snapshot.
if (prev_iter->second.isEntry())
{
const auto & entry = prev_iter->second;
edit.varEntry(page_id, prev_iter->first, entry.entry, entry.being_ref_count);
edit.varDel(page_id, last_version);
break;
}
if (prev_iter == entries.begin())
break;
prev_iter--;
}
}
return;
Expand All @@ -697,7 +712,6 @@ PageDirectory::PageDirectory(String storage_name, WALStorePtr && wal_, UInt64 ma
, wal(std::move(wal_))
, max_persisted_log_files(max_persisted_log_files_)
, log(Logger::get("PageDirectory", std::move(storage_name)))

{
}

Expand Down Expand Up @@ -975,7 +989,7 @@ void PageDirectory::applyRefEditRecord(
const PageVersion & version)
{
// applying ref 3->2, existing ref 2->1, normal entry 1, then we should collapse
// the ref to be 3->1, increase the refcounting of normale entry 1
// the ref to be 3->1, increase the refcounting of normal entry 1
auto [resolve_success, resolved_id, resolved_ver] = [&mvcc_table_directory](PageIdV3Internal id_to_resolve, PageVersion ver_to_resolve)
-> std::tuple<bool, PageIdV3Internal, PageVersion> {
while (true)
Expand Down Expand Up @@ -1216,12 +1230,12 @@ PageDirectory::getEntriesByBlobIds(const std::vector<BlobFileId> & blob_ids) con
return std::make_pair(std::move(blob_versioned_entries), total_page_size);
}

bool PageDirectory::tryDumpSnapshot(const ReadLimiterPtr & read_limiter, const WriteLimiterPtr & write_limiter)
bool PageDirectory::tryDumpSnapshot(const ReadLimiterPtr & read_limiter, const WriteLimiterPtr & write_limiter, bool force)
{
bool done_any_io = false;
// In order not to make read amplification too high, only apply compact logs when ...
auto files_snap = wal->getFilesSnapshot();
if (files_snap.needSave(max_persisted_log_files))
if (files_snap.needSave(max_persisted_log_files) || (force && (!files_snap.persisted_log_files.empty())))
{
// To prevent writes from affecting dumping snapshot (and vice versa), old log files
// are read from disk and a temporary PageDirectory is generated for dumping snapshot.
Expand All @@ -1237,15 +1251,16 @@ bool PageDirectory::tryDumpSnapshot(const ReadLimiterPtr & read_limiter, const W
PageDirectoryPtr collapsed_dir = factory.createFromReader(
identifier,
std::move(snapshot_reader),
/*wal=*/nullptr);
/* wal */ nullptr,
/* for_dump_snapshot */ true);
// The records persisted in `files_snap` is older than or equal to all records in `edit`
auto edit_from_disk = collapsed_dir->dumpSnapshotToEdit();
done_any_io = wal->saveSnapshot(std::move(files_snap), std::move(edit_from_disk), write_limiter);
}
return done_any_io;
}

PageEntriesV3 PageDirectory::gcInMemEntries(bool return_removed_entries)
PageEntriesV3 PageDirectory::gcInMemEntries(bool return_removed_entries, bool keep_last_valid_var_entry)
{
UInt64 lowest_seq = sequence.load();

Expand Down Expand Up @@ -1310,7 +1325,8 @@ PageEntriesV3 PageDirectory::gcInMemEntries(bool return_removed_entries)
lowest_seq,
&normal_entries_to_deref,
return_removed_entries ? &all_del_entries : nullptr,
iter->second->acquireLock());
iter->second->acquireLock(),
keep_last_valid_var_entry);

{
std::unique_lock write_lock(table_rw_mutex);
Expand Down Expand Up @@ -1348,7 +1364,8 @@ PageEntriesV3 PageDirectory::gcInMemEntries(bool return_removed_entries)
page_id,
/*deref_ver=*/deref_counter.first,
/*deref_count=*/deref_counter.second,
return_removed_entries ? &all_del_entries : nullptr);
return_removed_entries ? &all_del_entries : nullptr,
keep_last_valid_var_entry);

if (all_deleted)
{
Expand Down
16 changes: 11 additions & 5 deletions dbms/src/Storages/Page/V3/PageDirectory.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ struct EntryOrDelete
.entry = entry,
};
}
static EntryOrDelete newRepalcingEntry(const EntryOrDelete & ori_entry, const PageEntryV3 & entry)
static EntryOrDelete newReplacingEntry(const EntryOrDelete & ori_entry, const PageEntryV3 & entry)
{
return EntryOrDelete{
.is_delete = false,
Expand Down Expand Up @@ -224,13 +224,15 @@ class VersionedPageEntries
UInt64 lowest_seq,
std::map<PageIdV3Internal, std::pair<PageVersion, Int64>> * normal_entries_to_deref,
PageEntriesV3 * entries_removed,
const PageLock & page_lock);
const PageLock & page_lock,
bool keep_last_valid_var_entry = false);
bool derefAndClean(
UInt64 lowest_seq,
PageIdV3Internal page_id,
const PageVersion & deref_ver,
Int64 deref_count,
PageEntriesV3 * entries_removed);
PageEntriesV3 * entries_removed,
bool keep_last_valid_var_entry = false);

void collapseTo(UInt64 seq, PageIdV3Internal page_id, PageEntriesEdit & edit);

Expand Down Expand Up @@ -358,11 +360,15 @@ class PageDirectory

void gcApply(PageEntriesEdit && migrated_edit, const WriteLimiterPtr & write_limiter = nullptr);

bool tryDumpSnapshot(const ReadLimiterPtr & read_limiter = nullptr, const WriteLimiterPtr & write_limiter = nullptr);
/// When create PageDirectory for dump snapshot, we should keep the last valid var_entry when it is deleted.
/// Because there may be some upsert entry in later wal files, and we should keep the valid var_entry and the delete entry to delete the later upsert entry.
/// And we don't restore the entries in blob store, because this PageDirectory is just read only for its entries.
bool tryDumpSnapshot(const ReadLimiterPtr & read_limiter = nullptr, const WriteLimiterPtr & write_limiter = nullptr, bool force = false);

// Perform a GC for in-memory entries and return the removed entries.
// If `return_removed_entries` is false, then just return an empty set.
PageEntriesV3 gcInMemEntries(bool return_removed_entries = true);
// When dump snapshot, we need to keep the last valid entry. Check out `tryDumpSnapshot` for the reason.
PageEntriesV3 gcInMemEntries(bool return_removed_entries = true, bool keep_last_valid_var_entry = false);

std::set<PageId> getAliveExternalIds(NamespaceId ns_id) const;

Expand Down
Loading