Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(exception_before_drop_segment) \
M(exception_after_drop_segment) \
M(exception_between_schema_change_in_the_same_diff) \
M(force_ps_wal_compact)
M(force_ps_wal_compact) \
M(pause_before_full_gc_prepare)

#define APPLY_FOR_FAILPOINTS(M) \
M(skip_check_segment_update) \
Expand Down
10 changes: 10 additions & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -236,11 +236,14 @@ namespace DB
F(type_mpp_query_count, {"type", "mpp_query_count"})) \
// clang-format on

/// Buckets with boundaries [start * base^0, start * base^1, ..., start * base^(size-1)]
struct ExpBuckets
{
const double start;
const int base;
const size_t size;

// NOLINTNEXTLINE(google-explicit-constructor)
inline operator prometheus::Histogram::BucketBoundaries() const &&
{
prometheus::Histogram::BucketBoundaries buckets(size);
Expand All @@ -259,6 +262,8 @@ struct EqualWidthBuckets
const size_t start;
const int num_buckets;
const size_t step;

// NOLINTNEXTLINE(google-explicit-constructor)
inline operator prometheus::Histogram::BucketBoundaries() const &&
{
// up to `num_buckets` * `step`
Expand Down Expand Up @@ -384,12 +389,17 @@ class TiFlashMetrics
APPLY_FOR_METRICS(MAKE_METRIC_ENUM_M, MAKE_METRIC_ENUM_F)
#undef APPLY_FOR_METRICS

// NOLINTNEXTLINE(bugprone-reserved-identifier)
#define __GET_METRIC_MACRO(_1, _2, NAME, ...) NAME
#ifndef GTEST_TIFLASH_METRICS
// NOLINTNEXTLINE(bugprone-reserved-identifier)
#define __GET_METRIC_0(family) TiFlashMetrics::instance().family.get()
// NOLINTNEXTLINE(bugprone-reserved-identifier)
#define __GET_METRIC_1(family, metric) TiFlashMetrics::instance().family.get(family##_metrics::metric)
#else
// NOLINTNEXTLINE(bugprone-reserved-identifier)
#define __GET_METRIC_0(family) TestMetrics::instance().family.get()
// NOLINTNEXTLINE(bugprone-reserved-identifier)
#define __GET_METRIC_1(family, metric) TestMetrics::instance().family.get(family##_metrics::metric)
#endif
#define GET_METRIC(...) \
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Storages/Page/Page.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ struct Page

using Pages = std::vector<Page>;
using PageMap = std::map<PageId, Page>;
using PageHandler = std::function<void(PageId page_id, const Page &)>;

// TODO: Move it into V2
// Indicate the page size && offset in PageFile.
Expand Down
18 changes: 0 additions & 18 deletions dbms/src/Storages/Page/PageStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ class PageReaderImpl : private boost::noncopyable

virtual PageMap read(const PageIds & page_ids) const = 0;

virtual void read(const PageIds & page_ids, PageHandler & handler) const = 0;

using PageReadFields = PageStorage::PageReadFields;
virtual PageMap read(const std::vector<PageReadFields> & page_fields) const = 0;

Expand Down Expand Up @@ -103,11 +101,6 @@ class PageReaderImplNormal : public PageReaderImpl
return storage->read(ns_id, page_ids, read_limiter, snap);
}

void read(const PageIds & page_ids, PageHandler & handler) const override
{
storage->read(ns_id, page_ids, handler, read_limiter, snap);
}

using PageReadFields = PageStorage::PageReadFields;
PageMap read(const std::vector<PageReadFields> & page_fields) const override
{
Expand Down Expand Up @@ -218,12 +211,6 @@ class PageReaderImplMixed : public PageReaderImpl
return page_maps;
}

void read(const PageIds & page_ids, PageHandler & handler) const override
{
const auto & page_ids_not_found = storage_v3->read(ns_id, page_ids, handler, read_limiter, toConcreteV3Snapshot(), false);
storage_v2->read(ns_id, page_ids_not_found, handler, read_limiter, toConcreteV2Snapshot());
}

using PageReadFields = PageStorage::PageReadFields;
PageMap read(const std::vector<PageReadFields> & page_fields) const override
{
Expand Down Expand Up @@ -406,11 +393,6 @@ PageMap PageReader::read(const PageIds & page_ids) const
return impl->read(page_ids);
}

void PageReader::read(const PageIds & page_ids, PageHandler & handler) const
{
impl->read(page_ids, handler);
}

PageMap PageReader::read(const std::vector<PageStorage::PageReadFields> & page_fields) const
{
return impl->read(page_fields);
Expand Down
13 changes: 0 additions & 13 deletions dbms/src/Storages/Page/PageStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,15 +169,6 @@ class PageStorage : private boost::noncopyable
return readImpl(ns_id, page_ids, read_limiter, snapshot, throw_on_not_exist);
}

/**
* If throw_on_not_exist is false, Also we do have some of page_id not found.
* Then the return value will record the all of page_id which not found.
*/
PageIds read(NamespaceId ns_id, const PageIds & page_ids, const PageHandler & handler, const ReadLimiterPtr & read_limiter = nullptr, SnapshotPtr snapshot = {}, bool throw_on_not_exist = true)
{
return readImpl(ns_id, page_ids, handler, read_limiter, snapshot, throw_on_not_exist);
}

using FieldIndices = std::vector<size_t>;
using PageReadFields = std::pair<PageId, FieldIndices>;

Expand Down Expand Up @@ -223,8 +214,6 @@ class PageStorage : private boost::noncopyable

virtual PageMap readImpl(NamespaceId ns_id, const PageIds & page_ids, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot, bool throw_on_not_exist) = 0;

virtual PageIds readImpl(NamespaceId ns_id, const PageIds & page_ids, const PageHandler & handler, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot, bool throw_on_not_exist) = 0;

virtual PageMap readImpl(NamespaceId ns_id, const std::vector<PageReadFields> & page_fields, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot, bool throw_on_not_exist) = 0;

virtual Page readImpl(NamespaceId ns_id, const PageReadFields & page_field, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot, bool throw_on_not_exist) = 0;
Expand Down Expand Up @@ -261,8 +250,6 @@ class PageReader : private boost::noncopyable

PageMap read(const PageIds & page_ids) const;

void read(const PageIds & page_ids, PageHandler & handler) const;

using PageReadFields = PageStorage::PageReadFields;
PageMap read(const std::vector<PageReadFields> & page_fields) const;

Expand Down
57 changes: 0 additions & 57 deletions dbms/src/Storages/Page/V2/PageFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -935,63 +935,6 @@ PageMap PageFile::Reader::read(PageIdAndEntries & to_read, const ReadLimiterPtr
return page_map;
}

void PageFile::Reader::read(PageIdAndEntries & to_read, const PageHandler & handler, const ReadLimiterPtr & read_limiter)
{
ProfileEvents::increment(ProfileEvents::PSMReadPages, to_read.size());

// Sort in ascending order by offset in file.
std::sort(to_read.begin(), to_read.end(), [](const PageIdAndEntry & a, const PageIdAndEntry & b) {
return a.second.offset < b.second.offset;
});

size_t buf_size = 0;
for (const auto & p : to_read)
buf_size = std::max(buf_size, p.second.size);

char * data_buf = static_cast<char *>(alloc(buf_size));
MemHolder mem_holder = createMemHolder(data_buf, [&, buf_size](char * p) { free(p, buf_size); });


auto it = to_read.begin();
while (it != to_read.end())
{
auto && [page_id, entry] = *it;

PageUtil::readFile(data_file, entry.offset, data_buf, entry.size, read_limiter);

if constexpr (PAGE_CHECKSUM_ON_READ)
{
auto checksum = CityHash_v1_0_2::CityHash64(data_buf, entry.size);
if (unlikely(entry.size != 0 && checksum != entry.checksum))
{
std::stringstream ss;
ss << ", expected: " << std::hex << entry.checksum << ", but: " << checksum;
throw Exception("Page [" + DB::toString(page_id) + "] checksum not match, broken file: " + data_file_path + ss.str(),
ErrorCodes::CHECKSUM_DOESNT_MATCH);
}
}

Page page;
page.page_id = page_id;
page.data = ByteBuffer(data_buf, data_buf + entry.size);
page.mem_holder = mem_holder;

++it;

//#ifndef __APPLE__
// if (it != to_read.end())
// {
// auto & next_page_cache = it->second;
// ::posix_fadvise(data_file_fd, next_page_cache.offset, next_page_cache.size, POSIX_FADV_WILLNEED);
// }
//#endif

handler(page_id, page);
}

last_read_time = Clock::now();
}

PageMap PageFile::Reader::read(PageFile::Reader::FieldReadInfos & to_read, const ReadLimiterPtr & read_limiter)
{
ProfileEvents::increment(ProfileEvents::PSMReadPages, to_read.size());
Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Storages/Page/V2/PageFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,6 @@ class PageFile : public Allocator<false>
/// After return, the items in to_read could be reordered, but won't be removed or added.
PageMap read(PageIdAndEntries & to_read, const ReadLimiterPtr & read_limiter = nullptr, bool background = false);

void read(PageIdAndEntries & to_read, const PageHandler & handler, const ReadLimiterPtr & read_limiter = nullptr);

struct FieldReadInfo
{
PageId page_id;
Expand Down
47 changes: 0 additions & 47 deletions dbms/src/Storages/Page/V2/PageStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -697,53 +697,6 @@ PageMap PageStorage::readImpl(NamespaceId /*ns_id*/, const PageIds & page_ids, c
return page_map;
}

PageIds PageStorage::readImpl(NamespaceId /*ns_id*/, const PageIds & page_ids, const PageHandler & handler, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot, bool throw_on_not_exist)
{
if (!snapshot)
{
snapshot = this->getSnapshot("");
}

if (!throw_on_not_exist)
{
throw Exception("Not support throw_on_not_exist on V2", ErrorCodes::NOT_IMPLEMENTED);
}

std::map<PageFileIdAndLevel, std::pair<PageIdAndEntries, ReaderPtr>> file_read_infos;
for (auto page_id : page_ids)
{
const auto page_entry = toConcreteSnapshot(snapshot)->version()->find(page_id);
if (!page_entry)
throw Exception(fmt::format("Page {} not found", page_id), ErrorCodes::LOGICAL_ERROR);
auto file_id_level = page_entry->fileIdLevel();
auto & [page_id_and_entries, file_reader] = file_read_infos[file_id_level];
page_id_and_entries.emplace_back(page_id, *page_entry);
if (file_reader == nullptr)
{
try
{
file_reader = getReader(file_id_level);
}
catch (DB::Exception & e)
{
e.addMessage(fmt::format("(while reading Page[{}] of {})", page_id, storage_name));
throw;
}
}
}

for (auto & [file_id_level, entries_and_reader] : file_read_infos)
{
(void)file_id_level;
auto & page_id_and_entries = entries_and_reader.first;
auto & reader = entries_and_reader.second;

reader->read(page_id_and_entries, handler, read_limiter);
}

return {};
}

PageMap PageStorage::readImpl(NamespaceId /*ns_id*/, const std::vector<PageReadFields> & page_fields, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot, bool throw_on_not_exist)
{
if (!snapshot)
Expand Down
3 changes: 0 additions & 3 deletions dbms/src/Storages/Page/V2/PageStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,6 @@ class PageStorage : public DB::PageStorage

PageMap readImpl(NamespaceId ns_id, const PageIds & page_ids, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot, bool throw_on_not_exist) override;

PageIds readImpl(NamespaceId ns_id, const PageIds & page_ids, const PageHandler & handler, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot, bool throw_on_not_exist) override;

PageMap readImpl(NamespaceId ns_id, const std::vector<PageReadFields> & page_fields, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot, bool throw_on_not_exist) override;

DB::Page readImpl(NamespaceId ns_id, const PageReadFields & page_field, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot, bool throw_on_not_exist) override;
Expand Down Expand Up @@ -209,7 +207,6 @@ class PageStorage : public DB::PageStorage
DB::Page read(PageId page_id, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot) { return readImpl(TEST_NAMESPACE_ID, page_id, read_limiter, snapshot, true); }
PageMap read(const PageIds & page_ids) { return readImpl(TEST_NAMESPACE_ID, page_ids, nullptr, nullptr, true); }
PageMap read(const PageIds & page_ids, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot) { return readImpl(TEST_NAMESPACE_ID, page_ids, read_limiter, snapshot, true); };
PageIds read(const PageIds & page_ids, const PageHandler & handler) { return readImpl(TEST_NAMESPACE_ID, page_ids, handler, nullptr, nullptr, true); }
PageMap read(const std::vector<PageReadFields> & page_fields) { return readImpl(TEST_NAMESPACE_ID, page_fields, nullptr, nullptr, true); }
void traverse(const std::function<void(const DB::Page & page)> & acceptor) { return traverseImpl(acceptor, nullptr); }
bool gc() { return gcImpl(false, nullptr, nullptr); }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,26 +224,25 @@ class PSReader : public Poco::Runnable
LOG_TRACE(&Poco::Logger::get("root"), e.displayText());
}
#else
PageIds pageIds;
PageIds page_ids;
for (size_t i = 0; i < 5; ++i)
{
pageIds.emplace_back(random() % ctx.MAX_PAGE_ID);
page_ids.emplace_back(random() % ctx.MAX_PAGE_ID);
}
try
{
// std::function<void(PageId page_id, const Page &)>;
PageHandler handler = [&](PageId page_id, const Page & page) {
(void)page_id;
auto page_map = storage->read(page_ids);
for (const auto & page : page_map)
{
// use `sleep` to mock heavy read
if (heavy_read_delay_ms > 0)
{
//const uint32_t micro_seconds_to_sleep = 10;
usleep(heavy_read_delay_ms * 1000);
}
++pages_read;
bytes_read += page.data.size();
};
storage->read(pageIds, handler);
bytes_read += page.second.data.size();
}
}
catch (DB::Exception & e)
{
Expand Down
67 changes: 0 additions & 67 deletions dbms/src/Storages/Page/V3/BlobStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -526,73 +526,6 @@ void BlobStore::removePosFromStats(BlobFileId blob_id, BlobFileOffset offset, si
}
}

void BlobStore::read(PageIDAndEntriesV3 & entries, const PageHandler & handler, const ReadLimiterPtr & read_limiter)
{
if (entries.empty())
{
return;
}

ProfileEvents::increment(ProfileEvents::PSMReadPages, entries.size());

// Sort in ascending order by offset in file.
std::sort(entries.begin(), entries.end(), [](const auto & a, const auto & b) {
return a.second.offset < b.second.offset;
});

// allocate data_buf that can hold all pages
size_t buf_size = 0;
for (const auto & p : entries)
buf_size = std::max(buf_size, p.second.size);

// When we read `WriteBatch` which is `WriteType::PUT_EXTERNAL`.
// The `buf_size` will be 0, we need avoid calling malloc/free with size 0.
if (buf_size == 0)
{
for (const auto & [page_id_v3, entry] : entries)
{
(void)entry;
LOG_DEBUG(log, "Read entry [page_id={}] without entry size.", page_id_v3);
Page page(page_id_v3);
handler(page_id_v3.low, page);
}
return;
}

char * data_buf = static_cast<char *>(alloc(buf_size));
MemHolder mem_holder = createMemHolder(data_buf, [&, buf_size](char * p) {
free(p, buf_size);
});

for (const auto & [page_id_v3, entry] : entries)
{
auto blob_file = read(page_id_v3, entry.file_id, entry.offset, data_buf, entry.size, read_limiter);

if constexpr (BLOBSTORE_CHECKSUM_ON_READ)
{
ChecksumClass digest;
digest.update(data_buf, entry.size);
auto checksum = digest.checksum();
if (unlikely(entry.size != 0 && checksum != entry.checksum))
{
throw Exception(
fmt::format("Reading with entries meet checksum not match [page_id={}] [expected=0x{:X}] [actual=0x{:X}] [entry={}] [file={}]",
page_id_v3,
entry.checksum,
checksum,
toDebugString(entry),
blob_file->getPath()),
ErrorCodes::CHECKSUM_DOESNT_MATCH);
}
}

Page page(page_id_v3);
page.data = ByteBuffer(data_buf, data_buf + entry.size);
page.mem_holder = mem_holder;
handler(page_id_v3.low, page);
}
}

PageMap BlobStore::read(FieldReadInfos & to_read, const ReadLimiterPtr & read_limiter)
{
if (to_read.empty())
Expand Down
Loading