diff --git a/dbms/src/Storages/Page/V3/PageDirectory.cpp b/dbms/src/Storages/Page/V3/PageDirectory.cpp index 4609170e46d..ab5e0ced0bd 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectory.cpp @@ -1126,12 +1126,14 @@ PageId PageDirectory::getMaxId() const std::set PageDirectory::getAllPageIds() { std::set page_ids; - std::shared_lock read_lock(table_rw_mutex); + std::shared_lock read_lock(table_rw_mutex); + const auto seq = sequence.load(); for (auto & [page_id, versioned] : mvcc_table_directory) { - (void)versioned; - page_ids.insert(page_id); + // Only return the page_id that is visible + if (versioned->isVisible(seq)) + page_ids.insert(page_id); } return page_ids; } diff --git a/dbms/src/Storages/Page/workload/HeavyMemoryCostInGC.h b/dbms/src/Storages/Page/workload/HeavyMemoryCostInGC.h index 9bd2fd44a39..497aef06340 100644 --- a/dbms/src/Storages/Page/workload/HeavyMemoryCostInGC.h +++ b/dbms/src/Storages/Page/workload/HeavyMemoryCostInGC.h @@ -52,21 +52,17 @@ class HeavyMemoryCostInGC DB::PageStorageConfig config; initPageStorage(config, name()); - metrics_dumper = std::make_shared(1); - metrics_dumper->start(); - - stress_time = std::make_shared(30); - stress_time->start(); + startBackgroundTimer(); startWriter(options.num_writers, [](std::shared_ptr writer) -> void { writer->setBatchBufferNums(100); - writer->setBatchBufferSize(1); + writer->setBufferSizeRange(1, 1); }); pool.joinAll(); stop_watch.stop(); - gc = std::make_shared(ps); + gc = std::make_shared(ps, options.gc_interval_s); gc->doGcOnce(); } @@ -78,7 +74,9 @@ class HeavyMemoryCostInGC void onFailed() override { LOG_WARNING(StressEnv::logger, - fmt::format("Memory Peak is {} , it should not bigger than {} ", metrics_dumper->getMemoryPeak(), 5 * 1024 * 1024)); + "Memory Peak is {}, it should not bigger than {}", + metrics_dumper->getMemoryPeak(), + 5 * 1024 * 1024); } }; } // namespace DB::PS::tests diff --git a/dbms/src/Storages/Page/workload/HeavyRead.h b/dbms/src/Storages/Page/workload/HeavyRead.h index 79101605ce1..c45c00e7c68 100644 --- a/dbms/src/Storages/Page/workload/HeavyRead.h +++ b/dbms/src/Storages/Page/workload/HeavyRead.h @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include namespace DB::PS::tests @@ -50,13 +51,9 @@ class HeavyRead : public StressWorkload { DB::PageStorageConfig config; initPageStorage(config, name()); - PSWriter::fillAllPages(ps); + initPages(MAX_PAGE_ID_DEFAULT); - metrics_dumper = std::make_shared(1); - metrics_dumper->start(); - - stress_time = std::make_shared(60); - stress_time->start(); + startBackgroundTimer(); { stop_watch.start(); startReader(options.num_readers, [](std::shared_ptr reader) -> void { diff --git a/dbms/src/Storages/Page/workload/HeavySkewWriteRead.h b/dbms/src/Storages/Page/workload/HeavySkewWriteRead.h index 71f0df43af9..1edca02c1e6 100644 --- a/dbms/src/Storages/Page/workload/HeavySkewWriteRead.h +++ b/dbms/src/Storages/Page/workload/HeavySkewWriteRead.h @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include namespace DB::PS::tests @@ -38,8 +39,8 @@ class HeavySkewWriteRead : public StressWorkload String desc() override { return fmt::format("Some of options will be ignored" - "`paths` will only used first one. which is {}. Data will store in {} ." - "Please cleanup folder after this test." + "`paths` will only used first one. which is {}. Data will store in {}. " + "Please cleanup folder after this test. " "The current workload will elapse near 60 seconds", options.paths[0], options.paths[0] + "/" + name()); @@ -51,28 +52,21 @@ class HeavySkewWriteRead : public StressWorkload DB::PageStorageConfig config; initPageStorage(config, name()); - metrics_dumper = std::make_shared(1); - metrics_dumper->start(); + startBackgroundTimer(); - stress_time = std::make_shared(60); - stress_time->start(); { stop_watch.start(); - startWriter(options.num_writers, [](std::shared_ptr writer) -> void { + const auto num_writers = options.num_writers; + startWriter(num_writers, [&](std::shared_ptr writer) { writer->setBatchBufferNums(1); - writer->setBatchBufferRange(10 * 1024, 1 * DB::MB); - writer->setWindowSize(500); - writer->setNormalDistributionSigma(13); + writer->setBufferSizeRange(0, options.avg_page_size * 2); + writer->setNormalDistributionSigma(250); }); - auto num_writers = options.num_writers; - - startReader(options.num_readers, [num_writers](std::shared_ptr reader) -> void { - reader->setPageReadOnce(5); + startReader(options.num_readers, [](std::shared_ptr reader) { + reader->setReadPageNums(5); reader->setReadDelay(0); - reader->setWriterNums(num_writers); - reader->setWindowSize(100); - reader->setNormalDistributionSigma(9); + reader->setNormalDistributionSigma(250); }); pool.joinAll(); diff --git a/dbms/src/Storages/Page/workload/HeavyWrite.h b/dbms/src/Storages/Page/workload/HeavyWrite.h index 4e88e0f34f3..fec9906cafc 100644 --- a/dbms/src/Storages/Page/workload/HeavyWrite.h +++ b/dbms/src/Storages/Page/workload/HeavyWrite.h @@ -50,16 +50,12 @@ class HeavyWrite : public StressWorkload DB::PageStorageConfig config; initPageStorage(config, name()); - metrics_dumper = std::make_shared(1); - metrics_dumper->start(); - - stress_time = std::make_shared(60); - stress_time->start(); + startBackgroundTimer(); { stop_watch.start(); startWriter(options.num_writers, [](std::shared_ptr writer) -> void { writer->setBatchBufferNums(4); - writer->setBatchBufferRange(1, 2 * DB::MB); + writer->setBufferSizeRange(1, 2 * DB::MB); }); pool.joinAll(); diff --git a/dbms/src/Storages/Page/workload/HighValidBigFileGC.h b/dbms/src/Storages/Page/workload/HighValidBigFileGC.h index 87405b0ef3f..3bc233fcf9f 100644 --- a/dbms/src/Storages/Page/workload/HighValidBigFileGC.h +++ b/dbms/src/Storages/Page/workload/HighValidBigFileGC.h @@ -65,7 +65,7 @@ class HighValidBigFileGCWorkload startWriter(1, [](std::shared_ptr writer) -> void { writer->setBatchBufferNums(1); - writer->setBatchBufferSize(100ULL * DB::MB); + writer->setBufferSizeRange(100ULL * DB::MB, 100ULL * DB::MB); writer->setBatchBufferLimit(8ULL * DB::GB); writer->setBatchBufferPageRange(1000000); }); @@ -86,7 +86,7 @@ class HighValidBigFileGCWorkload stop_watch.start(); startWriter(1, [](std::shared_ptr writer) -> void { writer->setBatchBufferNums(4); - writer->setBatchBufferSize(2ULL * DB::MB); + writer->setBufferSizeRange(2ULL * DB::MB, 2ULL * DB::MB); writer->setBatchBufferLimit(1ULL * DB::GB); writer->setBatchBufferPageRange(1000000); }); @@ -96,14 +96,14 @@ class HighValidBigFileGCWorkload onDumpResult(); } - gc = std::make_shared(ps); + gc = std::make_shared(ps, options.gc_interval_s); gc->doGcOnce(); gc_time_ms = gc->getElapsedMilliseconds(); { stop_watch.start(); startWriter(1, [](std::shared_ptr writer) -> void { writer->setBatchBufferNums(4); - writer->setBatchBufferSize(2ULL * DB::MB); + writer->setBufferSizeRange(2ULL * DB::MB, 2ULL * DB::MB); writer->setBatchBufferLimit(1ULL * DB::GB); writer->setBatchBufferPageRange(1000000); }); @@ -123,7 +123,7 @@ class HighValidBigFileGCWorkload void onFailed() override { - LOG_WARNING(StressEnv::logger, fmt::format("GC time is {} , it should not bigger than {} ", gc_time_ms, 1 * 1000)); + LOG_WARNING(StressEnv::logger, "GC time is {} , it should not bigger than {} ", gc_time_ms, 1 * 1000); } private: diff --git a/dbms/src/Storages/Page/workload/HoldSnapshotsLongTime.h b/dbms/src/Storages/Page/workload/HoldSnapshotsLongTime.h index 81816f72375..07273d9d64e 100644 --- a/dbms/src/Storages/Page/workload/HoldSnapshotsLongTime.h +++ b/dbms/src/Storages/Page/workload/HoldSnapshotsLongTime.h @@ -52,22 +52,14 @@ class HoldSnapshotsLongTime : public StressWorkload DB::PageStorageConfig config; initPageStorage(config, name()); - metrics_dumper = std::make_shared(1); - metrics_dumper->start(); - - stress_time = std::make_shared(60); - stress_time->start(); - - scanner = std::make_shared(ps); - scanner->start(); + startBackgroundTimer(); // 90-100 snapshots will be generated. { stop_watch.start(); startWriter(options.num_writers, [](std::shared_ptr writer) -> void { writer->setBatchBufferNums(1); - writer->setBatchBufferRange(10 * 1024, 1 * DB::MB); - writer->setWindowSize(500); + writer->setBufferSizeRange(10 * 1024, 1 * DB::MB); writer->setNormalDistributionSigma(13); }); @@ -79,7 +71,7 @@ class HoldSnapshotsLongTime : public StressWorkload stop_watch.stop(); } - gc = std::make_shared(ps); + gc = std::make_shared(ps, options.gc_interval_s); // Normal GC gc->doGcOnce(); diff --git a/dbms/src/Storages/Page/workload/MainEntry.cpp b/dbms/src/Storages/Page/workload/MainEntry.cpp index 18e42106c90..19be50e4924 100644 --- a/dbms/src/Storages/Page/workload/MainEntry.cpp +++ b/dbms/src/Storages/Page/workload/MainEntry.cpp @@ -44,7 +44,7 @@ int StressWorkload::mainEntry(int argc, char ** argv) auto env = StressEnv::parse(argc, argv); env.setup(); - auto & mamager = StressWorkloadManger::getInstance(); + auto & mamager = PageWorkloadFactory::getInstance(); mamager.setEnv(env); mamager.runWorkload(); @@ -55,4 +55,4 @@ int StressWorkload::mainEntry(int argc, char ** argv) DB::tryLogCurrentException(""); exit(-1); } -} \ No newline at end of file +} diff --git a/dbms/src/Storages/Page/workload/Normal.h b/dbms/src/Storages/Page/workload/Normal.h index 033f226bf89..c25352c30bc 100644 --- a/dbms/src/Storages/Page/workload/Normal.h +++ b/dbms/src/Storages/Page/workload/Normal.h @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include +#include #include namespace DB::PS::tests @@ -48,25 +50,22 @@ class NormalWorkload config.num_write_slots = options.num_writer_slots; initPageStorage(config); - if (options.avg_page_size_mb != 0) - { - PSWriter::setApproxPageSize(options.avg_page_size_mb); - } - // init all pages in PageStorage - if (options.init_pages || options.just_init_pages) + if (options.init_pages) { - PSWriter::fillAllPages(ps); + static constexpr PageId MAX_PAGE_ID_DEFAULT = 1000; + initPages(MAX_PAGE_ID_DEFAULT); LOG_INFO(StressEnv::logger, "All pages have been init."); - if (options.just_init_pages) - { - return; - } } + RUNTIME_CHECK(options.avg_page_size > 1000); + stop_watch.start(); - startWriter(options.num_writers); + startWriter(options.num_writers, [this](std::shared_ptr w) { + // A small random range + w->setBufferSizeRange(options.avg_page_size - 1000 / 2, options.avg_page_size + 1000 / 2); + }); const size_t read_delay_ms = options.read_delay_ms; startReader(options.num_readers, [read_delay_ms](std::shared_ptr reader) -> void { reader->setReadDelay(read_delay_ms); diff --git a/dbms/src/Storages/Page/workload/PSBackground.cpp b/dbms/src/Storages/Page/workload/PSBackground.cpp index 7cfc335c3a9..b4e31f5ef9f 100644 --- a/dbms/src/Storages/Page/workload/PSBackground.cpp +++ b/dbms/src/Storages/Page/workload/PSBackground.cpp @@ -71,7 +71,7 @@ void PSGc::start() gc_timer.start(Poco::TimerCallback(*this, &PSGc::onTime)); } -void PSScanner::onTime(Poco::Timer & /*timer*/) +void PSSnapStatGetter::onTime(Poco::Timer & /*timer*/) { try { @@ -94,9 +94,9 @@ void PSScanner::onTime(Poco::Timer & /*timer*/) } } -void PSScanner::start() +void PSSnapStatGetter::start() { - scanner_timer.start(Poco::TimerCallback(*this, &PSScanner::onTime)); + scanner_timer.start(Poco::TimerCallback(*this, &PSSnapStatGetter::onTime)); } // NOLINTNEXTLINE(readability-convert-member-functions-to-static) diff --git a/dbms/src/Storages/Page/workload/PSBackground.h b/dbms/src/Storages/Page/workload/PSBackground.h index c91dad1361f..5474c8cf013 100644 --- a/dbms/src/Storages/Page/workload/PSBackground.h +++ b/dbms/src/Storages/Page/workload/PSBackground.h @@ -58,6 +58,11 @@ class PSMetricsDumper void start(); + void stop() + { + timer_status.stop(); + } + UInt32 getMemoryPeak() const { auto info = metrics.find(CurrentMetrics::MemoryTracking); @@ -99,12 +104,12 @@ class PSGc PSPtr ps; public: - explicit PSGc(const PSPtr & ps_) + explicit PSGc(const PSPtr & ps_, uint64_t interval) : ps(ps_) { assert(ps != nullptr); gc_timer.setStartInterval(1000); - gc_timer.setPeriodicInterval(30 * 1000); + gc_timer.setPeriodicInterval(interval * 1000); } void doGcOnce(); @@ -113,6 +118,11 @@ class PSGc void start(); + void stop() + { + gc_timer.stop(); + } + UInt64 getElapsedMilliseconds() { return gc_stop_watch.elapsedMilliseconds(); @@ -124,12 +134,12 @@ class PSGc }; using PSGcPtr = std::shared_ptr; -class PSScanner +class PSSnapStatGetter { PSPtr ps; public: - explicit PSScanner(const PSPtr & ps_) + explicit PSSnapStatGetter(const PSPtr & ps_) : ps(ps_) { assert(ps != nullptr); @@ -142,10 +152,15 @@ class PSScanner void start(); + void stop() + { + scanner_timer.stop(); + } + private: Poco::Timer scanner_timer; }; -using PSScannerPtr = std::shared_ptr; +using PSSnapStatGetterPtr = std::shared_ptr; class StressTimeout { @@ -153,12 +168,16 @@ class StressTimeout explicit StressTimeout(size_t timeout_s) { StressEnvStatus::getInstance().setStat(STATUS_LOOP); - LOG_INFO(StressEnv::logger, fmt::format("Timeout: {}s", timeout_s)); + LOG_INFO(StressEnv::logger, "Timeout: {}s", timeout_s); timeout_timer.setStartInterval(timeout_s * 1000); } void onTime(Poco::Timer & timer); void start(); + void stop() + { + timeout_timer.stop(); + } private: Poco::Timer timeout_timer; diff --git a/dbms/src/Storages/Page/workload/PSRunnable.cpp b/dbms/src/Storages/Page/workload/PSRunnable.cpp index 01fd89c0377..591063d43cb 100644 --- a/dbms/src/Storages/Page/workload/PSRunnable.cpp +++ b/dbms/src/Storages/Page/workload/PSRunnable.cpp @@ -12,20 +12,37 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include #include #include #include +#include #include +#include #include +#include #include +#include #include namespace DB::PS::tests { + +void GlobalStat::commit(const RandomPageId & c) +{ + std::lock_guard lock(mtx_page_id); + commit_ids.insert(c.page_id); + for (const auto & id : c.page_id_to_remove) + { + commit_ids.erase(id); + pending_remove_ids.erase(id); + } +} + void PSRunnable::run() try { @@ -43,6 +60,8 @@ try } catch (...) { + // stop the whole testing + StressEnvStatus::getInstance().setStat(StressEnvStat::STATUS_EXCEPTION); DB::tryLogCurrentException(StressEnv::logger); } @@ -56,138 +75,121 @@ size_t PSRunnable::getPagesUsed() const return pages_used; } -size_t PSWriter::approx_page_mb = 2; -void PSWriter::setApproxPageSize(size_t size_mb) -{ - LOG_INFO(StressEnv::logger, "Page approx size is set to {} MB", size_mb); - approx_page_mb = size_mb; -} - -DB::ReadBufferPtr PSWriter::genRandomData(const DB::PageId pageId, DB::MemHolder & holder) -{ - // fill page with random bytes - std::mt19937 size_gen; - size_gen.seed(time(nullptr)); - std::uniform_int_distribution<> dist(0, 3000); - - const size_t buff_sz = approx_page_mb * DB::MB + dist(size_gen); - char * buff = static_cast(malloc(buff_sz)); // NOLINT - if (buff == nullptr) - { - throw DB::Exception("Alloc fix memory failed.", DB::ErrorCodes::LOGICAL_ERROR); - } - - const char buff_ch = pageId % 0xFF; - memset(buff, buff_ch, buff_sz); +/// +/// Writer +/// - holder = DB::createMemHolder(buff, [&](char * p) { free(p); }); // NOLINT - - return std::make_shared(const_cast(buff), buff_sz); -} - -void PSWriter::updatedRandomData() +DB::ReadBufferPtr PSWriter::getRandomData() { - size_t memory_size = approx_page_mb * DB::MB * 2; if (memory == nullptr) { - memory = static_cast(malloc(memory_size)); // NOLINT - if (memory == nullptr) - { - throw DB::Exception("Alloc fix memory failed.", DB::ErrorCodes::LOGICAL_ERROR); - } - for (size_t i = 0; i < memory_size; i++) - { - memset(memory + i, i % 0xFF, sizeof(char)); - } + memory = std::unique_ptr(new char[buffer_size_max + 1]); + for (size_t i = 0; i < buffer_size_max + 1; i++) + memory[i] = i % 0xFF; } - std::uniform_int_distribution<> dist(0, memory_size / 2 - 1); + std::uniform_int_distribution<> dist(buffer_size_min, buffer_size_max); size_t gen_size = dist(gen); - buff_ptr = std::make_shared(memory + gen_size, memory_size - gen_size); + return std::make_shared(memory.get(), gen_size); } -void PSWriter::fillAllPages(const PSPtr & ps) +void PSWriter::setBufferSizeRange(size_t min, size_t max) { - for (DB::PageId page_id = 0; page_id <= MAX_PAGE_ID_DEFAULT; ++page_id) - { - DB::MemHolder holder; - DB::ReadBufferPtr buff = genRandomData(page_id, holder); - - DB::WriteBatch wb{DB::TEST_NAMESPACE_ID}; - wb.putPage(page_id, 0, buff, buff->buffer().size()); - ps->write(std::move(wb)); - if (page_id % 100 == 0) - LOG_INFO(StressEnv::logger, "writer wrote page {}", page_id); - } + RUNTIME_CHECK(min >= 1); + RUNTIME_CHECK(max >= min); + buffer_size_min = min; + buffer_size_max = max; + + if (buffer_size_max - buffer_size_min >= 4096) + LOG_WARNING(StressEnv::logger, "The result maybe not stable, min_size={} max_size={}", min, max); } -bool PSWriter::runImpl() +void PSWriter::write(const RandomPageId & r) { - const DB::PageId page_id = genRandomPageId(); - updatedRandomData(); + auto buff_ptr = getRandomData(); DB::WriteBatch wb{DB::TEST_NAMESPACE_ID}; - wb.putPage(page_id, 0, buff_ptr, buff_ptr->buffer().size()); + wb.putPage(r.page_id, 0, buff_ptr, buff_ptr->buffer().size()); + for (const auto id : r.page_id_to_remove) + wb.delPage(id); ps->write(std::move(wb)); - ++pages_used; + + pages_used += 1; bytes_used += buff_ptr->buffer().size(); + + // verbose logging for debug + // LOG_TRACE(StressEnv::logger, "write done, page_id={}, remove={}", r.page_id, r.page_id_to_remove); + + global_stat->commit(r); +} + +bool PSWriter::runImpl() +{ + write(genRandomPageId()); return true; } -DB::PageId PSWriter::genRandomPageId() +RandomPageId PSWriter::genRandomPageId() { - std::normal_distribution<> distribution{static_cast(max_page_id) / 2, 150}; - return static_cast(std::round(distribution(gen))) % max_page_id; + // std::normal_distribution<> distribution{static_cast(max_page_id) / 2, 150}; + std::uniform_int_distribution dist(0UL, max_page_id - 1); + return RandomPageId(static_cast(std::round(dist(gen)))); } -void PSCommonWriter::updatedRandomData() +DB::ReadBufferPtr PSCommonWriter::getRandomData() { // Calculate the fixed memory size - size_t single_buff_size = ((buffer_size_min <= buffer_size_max && buffer_size_max > 0) ? buffer_size_max - : batch_buffer_size); + size_t single_buff_size = buffer_size_max; size_t memory_size = single_buff_size * batch_buffer_nums; if (memory == nullptr) { - memory = static_cast(malloc(memory_size)); // NOLINT - if (memory == nullptr) - { - throw DB::Exception("Alloc fix memory failed.", DB::ErrorCodes::LOGICAL_ERROR); - } - + memory = std::unique_ptr(new char[memory_size]); for (size_t i = 0; i < memory_size; i++) - { - memset(memory + i, i % 0xFF, sizeof(char)); - } + memory[i] = i % 0xFF; } - buff_ptrs.clear(); - - size_t gen_size = genBufferSize(); - for (size_t i = 0; i < batch_buffer_nums; ++i) - { - buff_ptrs.emplace_back(std::make_shared(memory + i * single_buff_size, gen_size)); - } + std::uniform_int_distribution<> dist(buffer_size_min, buffer_size_max); + size_t gen_size = dist(gen); + return std::make_shared(memory.get(), gen_size); } -DB::PageId writing_page[1000]; - bool PSCommonWriter::runImpl() { - const DB::PageId page_id = genRandomPageId(); + const auto r = genRandomPageId(); + size_t page_write = 0; + size_t bytes_write = 0; + // FIXME: update one page_id by multiple data in one write batch? DB::WriteBatch wb{DB::TEST_NAMESPACE_ID}; - updatedRandomData(); - - for (auto & buffptr : buff_ptrs) + for (size_t i = 0; i < batch_buffer_nums; ++i) { - wb.putPage(page_id, 0, buffptr, buffptr->buffer().size()); - ++pages_used; - bytes_used += buffptr->buffer().size(); + auto buff_ptr = getRandomData(); + if (data_sizes.empty()) + { + wb.putPage(r.page_id, 0, buff_ptr, buff_ptr->buffer().size()); + } + else + { + // mock test for wide table that store many in-page-offsets + wb.putPage(r.page_id, 0, buff_ptr, buff_ptr->buffer().size(), data_sizes); + } + page_write += 1; + bytes_write += buff_ptr->buffer().size(); } + for (const auto & page_id : r.page_id_to_remove) + wb.delPage(page_id); ps->write(std::move(wb)); - return (batch_buffer_limit == 0 || bytes_used < batch_buffer_limit); + + pages_used += page_write; + bytes_used += bytes_write; + + // verbose logging for debug + // LOG_TRACE(StressEnv::logger, "write done, page_id={}, remove={}", r.page_id, r.page_id_to_remove); + global_stat->commit(r); + bool keep_running = (batch_buffer_limit == 0 || bytes_used < batch_buffer_limit); + return keep_running; } void PSCommonWriter::setBatchBufferNums(size_t numbers) @@ -195,11 +197,6 @@ void PSCommonWriter::setBatchBufferNums(size_t numbers) batch_buffer_nums = numbers; } -void PSCommonWriter::setBatchBufferSize(size_t size) -{ - batch_buffer_size = size; -} - void PSCommonWriter::setBatchBufferLimit(size_t size_limit) { batch_buffer_limit = size_limit; @@ -210,42 +207,19 @@ void PSCommonWriter::setBatchBufferPageRange(size_t max_page_id_) max_page_id = max_page_id_; } -DB::PageId PSCommonWriter::genRandomPageId() -{ - std::uniform_int_distribution<> dist(0, max_page_id); - return static_cast(dist(gen)); -} - -void PSCommonWriter::setBatchBufferRange(size_t min, size_t max) -{ - if (max > min && min > 0) - { - buffer_size_min = min; - buffer_size_max = max; - } -} - void PSCommonWriter::setFieldSize(const DB::PageFieldSizes & data_sizes_) { data_sizes = data_sizes_; } -size_t PSCommonWriter::genBufferSize() -{ - // If set min/max size set, use the range. Otherwise, use batch_buffer_size. - if (buffer_size_min <= buffer_size_max && buffer_size_max > 0) - { - std::uniform_int_distribution<> dist(buffer_size_min, buffer_size_max); - return dist(gen); - } - return batch_buffer_size; -} - +/// +/// Reader +/// DB::PageIds PSReader::genRandomPageIds() { DB::PageIds page_ids; - for (size_t i = 0; i < page_read_once; ++i) + for (size_t i = 0; i < num_pages_read; ++i) { std::uniform_int_distribution<> dist(0, max_page_id); page_ids.emplace_back(static_cast(dist(gen))); @@ -256,6 +230,8 @@ DB::PageIds PSReader::genRandomPageIds() bool PSReader::runImpl() { DB::PageIds page_ids = genRandomPageIds(); + if (page_ids.empty()) + return true; auto page_map = ps->read(DB::TEST_NAMESPACE_ID, page_ids); for (const auto & page : page_map) @@ -270,11 +246,6 @@ bool PSReader::runImpl() return true; } -void PSReader::setPageReadOnce(size_t page_read_once_) -{ - page_read_once = page_read_once_; -} - void PSReader::setReadDelay(size_t delay_ms) { heavy_read_delay_ms = delay_ms; @@ -287,103 +258,106 @@ void PSReader::setReadPageRange(size_t max_page_id_) void PSReader::setReadPageNums(size_t page_read_once_) { - page_read_once = page_read_once_; + num_pages_read = page_read_once_; } -void PSWindowWriter::setWindowSize(size_t window_size_) -{ - window_size = window_size_; -} +/// +/// WindowWriter +/// void PSWindowWriter::setNormalDistributionSigma(size_t sigma_) { sigma = sigma_; } -UInt64 pageid_boundary = 0; -std::mutex page_id_mutex; - -DB::PageId PSWindowWriter::genRandomPageId() +RandomPageId PSWindowWriter::genRandomPageId() { - std::lock_guard page_id_lock(page_id_mutex); - if (pageid_boundary < (window_size / 2)) - { - writing_page[index] = pageid_boundary++; - return static_cast(writing_page[index]); - } + std::lock_guard page_id_lock(global_stat->mtx_page_id); + DB::PageIdSet ids_to_del; + DB::PageId page_id = [this, &ids_to_del]() { + if (global_stat->right_id_boundary < 4 * sigma) + { + return global_stat->right_id_boundary++; + } - // Generate a random number in the window - std::normal_distribution<> distribution{static_cast(window_size), static_cast(sigma)}; - auto random = std::round(distribution(gen)); - // Move this "random" near the pageid_boundary, If "random" is still negative, then make it positive - random = std::abs(random + pageid_boundary); + // Generate a random number in the window, normal dist by μ=0 and σ=sigma + std::normal_distribution distribution{0.0, static_cast(sigma)}; + auto random = std::round(distribution(gen)); + // 100 - (100 - 68)/2 == 84% probability that update the existing page id + if (random <= sigma) + { + // Move this "random" near the right boundary - σ, (mock a hot write in an id range) + // we will update the data in this page_id + DB::PageId page_id = std::abs(global_stat->right_id_boundary - sigma + random); + return std::max(page_id, global_stat->left_id_boundary.load()); + } - auto page_id = static_cast(random > pageid_boundary ? pageid_boundary++ : random); - writing_page[index] = page_id; - return page_id; -} + // Else it is about 16% probability that we create a new page. + // Also we consider the pages with id less than (right boundary - 4σ) have no chance (less than 0.01% + // by the definition of normal distribution) for being read later, remove the pages. + DB::PageId left_boundary = 0; + if (global_stat->right_id_boundary > 3 * sigma) // ensure the new left boundary is not negative + left_boundary = global_stat->right_id_boundary - 3 * sigma; + global_stat->left_id_boundary = left_boundary; -void PSWindowReader::setWindowSize(size_t window_size_) -{ - window_size = window_size_; + // Remove the page id that is not likely update/read any more + for (const auto & id : global_stat->commit_ids) + { + if (id >= left_boundary) + break; + ids_to_del.insert(id); + global_stat->pending_remove_ids.insert(id); + } + + auto page_id = global_stat->right_id_boundary++; + if (page_id % 200 == 0) + LOG_INFO(StressEnv::logger, "Update boundary to [{}, {})", left_boundary, global_stat->right_id_boundary); + return page_id; + }(); + return RandomPageId(page_id, ids_to_del); } +/// +/// WindowReader +/// + void PSWindowReader::setNormalDistributionSigma(size_t sigma_) { sigma = sigma_; } -void PSWindowReader::setWriterNums(size_t writer_nums_) -{ - writer_nums = writer_nums_; -} - DB::PageIds PSWindowReader::genRandomPageIds() { - std::vector page_ids; - - if (pageid_boundary <= (writer_nums + page_read_once)) - { - // Nothing to read - return page_ids; - } - - size_t read_boundary = pageid_boundary - writer_nums - page_read_once; - if (read_boundary < window_size) - { - return page_ids; - } + const auto page_id_boundary_copy = global_stat->right_id_boundary.load(); + // Nothing to read + if (page_id_boundary_copy < num_pages_read) + return {}; + + const size_t read_right_boundary = page_id_boundary_copy - num_pages_read; + + // Generate a random number in the window, normal dist by μ=0 and σ=sigma + std::normal_distribution<> distribution{0.0, static_cast(sigma)}; + double r = distribution(gen); + // id > (right boundary+σ) is likely not written, turn the `r` into the left side of boundary + // for reading + if (r > sigma) + r = -r; + double rand_id = std::round(read_right_boundary - sigma + r); // the rand_id is double since it could be < 0.0 + // Limit by boundary + rand_id = std::max(rand_id, global_stat->left_id_boundary.load()); + rand_id = std::min(rand_id, read_right_boundary); - std::normal_distribution<> distribution{static_cast(window_size), - static_cast(sigma)}; - auto rand_id = std::round(distribution(gen)); - - rand_id = read_boundary - window_size + rand_id; - - // Bigger than window right boundary - if (rand_id > read_boundary) - { - rand_id = read_boundary; - } - - // Smaller than window left boundary - if (rand_id < 0) - { - rand_id = std::abs(rand_id); - } - - for (size_t i = rand_id; i < page_read_once + rand_id; ++i) + DB::PageIds page_ids; + std::lock_guard lock(global_stat->mtx_page_id); { - bool writing = false; - for (size_t j = 0; j < writer_nums; j++) + for (size_t id = rand_id; id < num_pages_read + rand_id; ++id) { - if (i == writing_page[j]) + if (global_stat->commit_ids.find(id) != global_stat->commit_ids.end() + && global_stat->pending_remove_ids.find(id) == global_stat->pending_remove_ids.end()) { - writing = true; + page_ids.emplace_back(id); } } - if (!writing) - page_ids.emplace_back(i); } return page_ids; @@ -412,8 +386,8 @@ void PSIncreaseWriter::setPageRange(size_t page_range) end_page_id = (index + 1) * page_range + 1; } -DB::PageId PSIncreaseWriter::genRandomPageId() +RandomPageId PSIncreaseWriter::genRandomPageId() { - return static_cast(begin_page_id++); + return RandomPageId(begin_page_id++); } } // namespace DB::PS::tests diff --git a/dbms/src/Storages/Page/workload/PSRunnable.h b/dbms/src/Storages/Page/workload/PSRunnable.h index b723236391d..72e2b81e46c 100644 --- a/dbms/src/Storages/Page/workload/PSRunnable.h +++ b/dbms/src/Storages/Page/workload/PSRunnable.h @@ -17,10 +17,9 @@ #include #include -const DB::PageId MAX_PAGE_ID_DEFAULT = 1000; - namespace DB::PS::tests { +static constexpr PageId MAX_PAGE_ID_DEFAULT = 1000; class PSRunnable : public Poco::Runnable { public: @@ -36,24 +35,55 @@ class PSRunnable : public Poco::Runnable size_t pages_used = 0; }; -class PSWriter : public PSRunnable +// The random page id for adding and removing +struct RandomPageId +{ + // The new page id to add to pagestorage + DB::PageId page_id; + // The page ids to removed from pagestorage + DB::PageIdSet page_id_to_remove; + + explicit RandomPageId(DB::PageId new_page_id) + : page_id(new_page_id) + {} + + RandomPageId(DB::PageId new_page_id, DB::PageIdSet page_id_to_remove_) + : page_id(new_page_id) + , page_id_to_remove(page_id_to_remove_) + { + } +}; + +// The shared status inside workload +struct GlobalStat { - static size_t approx_page_mb; + // shared status between PSWindowWriter and PSWindowReader + std::mutex mtx_page_id; + // The page ids between [left_id_boundary, right_id_boundary) + // and exists in `commit_ids` && not exists in `pending_remove_ids` + // is readable + std::atomic right_id_boundary = 0; + std::atomic left_id_boundary = 0; + std::set commit_ids; + std::set pending_remove_ids; + + void commit(const RandomPageId & c); +}; +class PSWriter : public PSRunnable +{ public: - PSWriter(const PSPtr & ps_, DB::UInt32 index_) + PSWriter(const PSPtr & ps_, DB::UInt32 index_, const std::unique_ptr & global_stat_) : ps(ps_) , index(index_) + , global_stat(global_stat_) { gen.seed(time(nullptr)); } ~PSWriter() override { - if (memory != nullptr) - { - free(memory); - } + memory.reset(); } String description() override @@ -61,26 +91,28 @@ class PSWriter : public PSRunnable return fmt::format("(Stress Test Writer {})", index); } - static void setApproxPageSize(size_t size_mb); - - static DB::ReadBufferPtr genRandomData(DB::PageId pageId, DB::MemHolder & holder); + void setBufferSizeRange(size_t min, size_t max); - virtual void updatedRandomData(); - - static void fillAllPages(const PSPtr & ps); + virtual DB::ReadBufferPtr getRandomData(); bool runImpl() override; + void write(const RandomPageId & r); + protected: - virtual DB::PageId genRandomPageId(); + virtual RandomPageId genRandomPageId(); protected: PSPtr ps; DB::UInt32 index = 0; std::mt19937 gen; DB::PageId max_page_id = MAX_PAGE_ID_DEFAULT; - char * memory = nullptr; - DB::ReadBufferPtr buff_ptr; + std::unique_ptr memory; + + size_t buffer_size_min = 1 * 1024 * 1024; + size_t buffer_size_max = 1 * 1024 * 1024; + + const std::unique_ptr & global_stat; }; @@ -89,11 +121,11 @@ class PSWriter : public PSRunnable class PSCommonWriter : public PSWriter { public: - PSCommonWriter(const PSPtr & ps_, DB::UInt32 index_) - : PSWriter(ps_, index_) + PSCommonWriter(const PSPtr & ps_, DB::UInt32 index_, const std::unique_ptr & global_stat_) + : PSWriter(ps_, index_, global_stat_) {} - void updatedRandomData() override; + DB::ReadBufferPtr getRandomData() override; String description() override { return fmt::format("(Stress Test Common Writer {})", index); } @@ -101,36 +133,24 @@ class PSCommonWriter : public PSWriter void setBatchBufferNums(size_t numbers); - void setBatchBufferSize(size_t size); - void setBatchBufferLimit(size_t size_limit); void setBatchBufferPageRange(size_t max_page_id_); - void setBatchBufferRange(size_t min, size_t max); - void setFieldSize(const DB::PageFieldSizes & data_sizes); protected: - std::vector buff_ptrs; size_t batch_buffer_nums = 100; - size_t batch_buffer_size = 1 * DB::MB; size_t batch_buffer_limit = 0; - size_t buffer_size_min = 0; - size_t buffer_size_max = 0; - DB::PageFieldSizes data_sizes = {}; - - DB::PageId genRandomPageId() override; - virtual size_t genBufferSize(); }; - // PSWindowsWriter can better simulate the user's workload in cooperation with PSWindowsReader // It can also be used as an independent writer to imitate user writing. // When the user is using TiFlash, The Pageid which in PageStorage should be continuously incremented. -// In the meantime, The PageId near the end may be constantly updated.So PSWindowsWriter looks like: +// In the meantime, The PageId near the end may be constantly updated. So the page ids updated by +// this class looks like: // // // | Pageid 1 Pageid 100 Pageid N | @@ -138,25 +158,23 @@ class PSCommonWriter : public PSWriter // | | // | window | // -// Every time the pageid written will be generated from the "window" range. -// And The random from the window should conform to the normal distribution. +// Every time the page_id written will be generated by a uniform dist. // When random number bigger than Pageid N, it will be Pageid N + 1, it means new page come. // When random number smaller than Pageid N, it means Page updated. +// And `genRandomPageId` will also return the page_id that is not likely to be updated later. class PSWindowWriter : public PSCommonWriter { public: - PSWindowWriter(const PSPtr & ps_, DB::UInt32 index_) - : PSCommonWriter(ps_, index_) + PSWindowWriter(const PSPtr & ps_, DB::UInt32 index_, const std::unique_ptr & global_stat_) + : PSCommonWriter(ps_, index_, global_stat_) {} String description() override { return fmt::format("(Stress Test Window Writer {})", index); } - void setWindowSize(size_t window_size); - void setNormalDistributionSigma(size_t sigma); protected: - DB::PageId genRandomPageId() override; + RandomPageId genRandomPageId() override; protected: size_t window_size = 100; @@ -166,8 +184,8 @@ class PSWindowWriter : public PSCommonWriter class PSIncreaseWriter : public PSCommonWriter { public: - PSIncreaseWriter(const PSPtr & ps_, DB::UInt32 index_) - : PSCommonWriter(ps_, index_) + PSIncreaseWriter(const PSPtr & ps_, DB::UInt32 index_, const std::unique_ptr & global_stat_) + : PSCommonWriter(ps_, index_, global_stat_) {} String description() override { return fmt::format("(Stress Test Increase Writer {})", index); } @@ -177,7 +195,7 @@ class PSIncreaseWriter : public PSCommonWriter void setPageRange(size_t page_range); protected: - DB::PageId genRandomPageId() override; + RandomPageId genRandomPageId() override; protected: size_t begin_page_id = 1; @@ -187,9 +205,10 @@ class PSIncreaseWriter : public PSCommonWriter class PSReader : public PSRunnable { public: - PSReader(const PSPtr & ps_, DB::UInt32 index_) + PSReader(const PSPtr & ps_, DB::UInt32 index_, const std::unique_ptr & global_stat_) : ps(ps_) , index(index_) + , global_stat(global_stat_) { gen.seed(time(nullptr)); } @@ -198,8 +217,6 @@ class PSReader : public PSRunnable bool runImpl() override; - void setPageReadOnce(size_t page_read_once); - void setReadDelay(size_t delay_ms); void setReadPageRange(size_t max_page_id); @@ -213,9 +230,10 @@ class PSReader : public PSRunnable PSPtr ps; std::mt19937 gen; size_t heavy_read_delay_ms = 0; - size_t page_read_once = 5; + size_t num_pages_read = 5; DB::UInt32 index = 0; DB::PageId max_page_id = MAX_PAGE_ID_DEFAULT; + const std::unique_ptr & global_stat; }; // PSWindowReader can better simulate the user's workload in cooperation with PSWindowsWriter @@ -233,23 +251,17 @@ class PSReader : public PSRunnable class PSWindowReader : public PSReader { public: - PSWindowReader(const PSPtr & ps_, DB::UInt32 index_) - : PSReader(ps_, index_) + PSWindowReader(const PSPtr & ps_, DB::UInt32 index_, const std::unique_ptr & global_stat_) + : PSReader(ps_, index_, global_stat_) {} - void setWindowSize(size_t window_size); - void setNormalDistributionSigma(size_t sigma); - void setWriterNums(size_t writer_nums); - protected: DB::PageIds genRandomPageIds() override; protected: - size_t window_size = 100; size_t sigma = 11; - size_t writer_nums = 0; std::mt19937 gen; }; @@ -259,8 +271,8 @@ class PSWindowReader : public PSReader class PSSnapshotReader : public PSReader { public: - PSSnapshotReader(const PSPtr & ps_, DB::UInt32 index_) - : PSReader(ps_, index_) + PSSnapshotReader(const PSPtr & ps_, DB::UInt32 index_, const std::unique_ptr & global_stat_) + : PSReader(ps_, index_, global_stat_) {} bool runImpl() override; diff --git a/dbms/src/Storages/Page/workload/PSStressEnv.cpp b/dbms/src/Storages/Page/workload/PSStressEnv.cpp index 288c8b13c2c..49a651e7388 100644 --- a/dbms/src/Storages/Page/workload/PSStressEnv.cpp +++ b/dbms/src/Storages/Page/workload/PSStressEnv.cpp @@ -13,6 +13,7 @@ // limitations under the License. +#include #include #include #include @@ -49,16 +50,16 @@ StressEnv StressEnv::parse(int argc, char ** argv) desc.add_options()("help,h", "produce help message") // ("write_concurrency,W", value()->default_value(4), "number of write threads") // ("read_concurrency,R", value()->default_value(16), "number of read threads") // - ("clean_before_run,C", value()->default_value(false), "drop data before running") // - ("init_pages,I", value()->default_value(false), "init pages if not exist before running") // - ("just_init_pages,J", value()->default_value(false), "Only init pages 0 - 1000.Then quit") // + ("dropdata", value()->default_value(true), "drop data before running") // + ("init_pages", value()->default_value(false), "init pages if not exist before running") // ("timeout,T", value()->default_value(600), "maximum run time (seconds). 0 means run infinitely") // - ("writer_slots", value()->default_value(4), "number of PageStorage writer slots") // + ("writer_slots", value()->default_value(4), "number of PageStorage writer slots (for V2)") // ("read_delay_ms", value()->default_value(0), "millionseconds of read delay") // - ("avg_page_size", value()->default_value(1), "avg size for each page(MiB)") // + ("avg_page_size", value()->default_value(2 * 1024 * 1024), "avg size for each page(bytes). 2 MiB by default") // ("paths,P", value>(), "store path(s)") // - ("failpoints,F", value>(), "failpoint(s) to enable") // - ("status_interval,S", value()->default_value(1), "Status statistics interval. 0 means no statistics") // + ("failpoints", value>(), "failpoint(s) to enable") // + ("gc_interval", value()->default_value(30), "GC interval(seconds). 0 means no gc") // + ("status_interval", value()->default_value(5), "Status statistics interval(seconds). 0 means no statistics") // ("situation_mask,M", value()->default_value(0), "Run special tests sequentially, example -M 2") // ("verify", value()->default_value(true), "Run special tests sequentially with verify.") // ("running_ps_version,V", value()->default_value(3), "Select a version of PageStorage. 2 or 3 can used"); @@ -70,7 +71,7 @@ StressEnv StressEnv::parse(int argc, char ** argv) if (options.count("help") > 0) { std::cerr << desc << std::endl; - std::cerr << StressWorkloadManger::getInstance().toDebugStirng() << std::endl; + std::cerr << PageWorkloadFactory::getInstance().toDebugStirng() << std::endl; exit(0); } @@ -78,12 +79,12 @@ StressEnv StressEnv::parse(int argc, char ** argv) opt.num_writers = options["write_concurrency"].as(); opt.num_readers = options["read_concurrency"].as(); opt.init_pages = options["init_pages"].as(); - opt.just_init_pages = options["just_init_pages"].as(); - opt.clean_before_run = options["clean_before_run"].as(); + opt.dropdata = options["dropdata"].as(); opt.timeout_s = options["timeout"].as(); opt.read_delay_ms = options["read_delay_ms"].as(); opt.num_writer_slots = options["writer_slots"].as(); - opt.avg_page_size_mb = options["avg_page_size"].as(); + opt.avg_page_size = options["avg_page_size"].as(); + opt.gc_interval_s = options["gc_interval"].as(); opt.status_interval = options["status_interval"].as(); opt.situation_mask = options["situation_mask"].as(); opt.verify = options["verify"].as(); @@ -96,6 +97,8 @@ StressEnv StressEnv::parse(int argc, char ** argv) exit(0); } + RUNTIME_CHECK(opt.avg_page_size > 0); + if (options.count("paths")) opt.paths = options["paths"].as>(); else @@ -109,8 +112,9 @@ StressEnv StressEnv::parse(int argc, char ** argv) void setupSignal() { signal(SIGINT, [](int /*signal*/) { - LOG_ERROR(StressEnv::logger, "Receive finish signal. Wait for the GC threads to end."); + LOG_INFO(StressEnv::logger, "Receive finish signal. Wait for the threads finish"); StressEnvStatus::getInstance().setStat(STATUS_INTERRUPT); + PageWorkloadFactory::getInstance().stopWorkload(); }); } @@ -134,17 +138,17 @@ void StressEnv::setup() if (Poco::File file(path); file.exists()) { all_directories_not_exist = false; - if (clean_before_run) + if (dropdata) { file.remove(true); } } } - if (clean_before_run) + if (dropdata) LOG_INFO(StressEnv::logger, "All pages have been drop."); - if (clean_before_run || all_directories_not_exist) + if (dropdata || all_directories_not_exist) init_pages = true; setupSignal(); } diff --git a/dbms/src/Storages/Page/workload/PSStressEnv.h b/dbms/src/Storages/Page/workload/PSStressEnv.h index e67cb325430..3cb583a05af 100644 --- a/dbms/src/Storages/Page/workload/PSStressEnv.h +++ b/dbms/src/Storages/Page/workload/PSStressEnv.h @@ -77,13 +77,13 @@ struct StressEnv size_t num_writers = 1; size_t num_readers = 4; bool init_pages = false; - bool just_init_pages = false; - bool clean_before_run = false; + bool dropdata = false; + size_t gc_interval_s = 30; size_t timeout_s = 0; size_t read_delay_ms = 0; size_t num_writer_slots = 1; - size_t avg_page_size_mb = 1; - size_t status_interval = 1; + size_t avg_page_size = 2 * 1024 * 1024; + size_t status_interval = 5; size_t situation_mask = 0; bool verify = true; size_t running_ps_version = 3; @@ -95,28 +95,29 @@ struct StressEnv { return fmt::format( "{{ " - "num_writers: {}, num_readers: {}, init_pages: {}, just_init_pages: {}" - ", clean_before_run: {}, timeout_s: {}, read_delay_ms: {}, num_writer_slots: {}" - ", avg_page_size_mb: {}, paths: [{}], failpoints: [{}]" - ", status_interval: {}, situation_mask: {}, verify: {}" - ", running_pagestorage_version : {}." + "num_writers: {}, num_readers: {}, init_pages: {}" + ", dropdata: {}, timeout_s: {}, read_delay_ms: {}, num_writer_slots: {}" + ", avg_page_size: {}, paths: [{}], failpoints: [{}]" + ", gc_interval_s: {}" + ", status_interval: {}, verify: {}" + ", situation_mask: {}" + ", running_pagestorage_version: {}" "}}", num_writers, num_readers, init_pages, - just_init_pages, - clean_before_run, + dropdata, timeout_s, read_delay_ms, num_writer_slots, - avg_page_size_mb, + avg_page_size, fmt::join(paths.begin(), paths.end(), ","), fmt::join(failpoints.begin(), failpoints.end(), ","), + gc_interval_s, status_interval, - situation_mask, verify, - running_ps_version - // + situation_mask, + running_ps_version // ); } diff --git a/dbms/src/Storages/Page/workload/PSWorkload.cpp b/dbms/src/Storages/Page/workload/PSWorkload.cpp index f9ff0b40c08..009106621f7 100644 --- a/dbms/src/Storages/Page/workload/PSWorkload.cpp +++ b/dbms/src/Storages/Page/workload/PSWorkload.cpp @@ -17,47 +17,76 @@ #include #include #include +#include #include #include +#include + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-parameter" +#include +#include +#pragma GCC diagnostic pop + namespace DB::PS::tests { void StressWorkload::onDumpResult() { UInt64 time_interval = stop_watch.elapsedMilliseconds(); - LOG_INFO(options.logger, fmt::format("result in {}ms", time_interval)); + LOG_INFO(options.logger, "result in {}ms", time_interval); double seconds_run = 1.0 * time_interval / 1000; + Poco::JSON::Object::Ptr details = new Poco::JSON::Object(); + size_t total_pages_written = 0; size_t total_bytes_written = 0; + Poco::JSON::Array::Ptr json_writers(new Poco::JSON::Array()); for (auto & writer : writers) { total_pages_written += writer->pages_used; total_bytes_written += writer->bytes_used; + + Poco::JSON::Object::Ptr json_writer = new Poco::JSON::Object(); + json_writer->set("pages", writer->pages_used); + json_writer->set("bytes", writer->bytes_used); + json_writers->add(json_writer); } + details->set("writers", json_writers); size_t total_pages_read = 0; size_t total_bytes_read = 0; + Poco::JSON::Array::Ptr json_readers(new Poco::JSON::Array()); for (auto & reader : readers) { total_pages_read += reader->pages_used; total_bytes_read += reader->bytes_used; + + Poco::JSON::Object::Ptr json_reader = new Poco::JSON::Object(); + json_reader->set("pages", reader->pages_used); + json_reader->set("bytes", reader->bytes_used); + json_readers->add(json_reader); } + details->set("readers", json_readers); + + LOG_INFO(options.logger, "{}", [&]() { + std::stringstream ss; + details->stringify(ss); + return ss.str(); + }()); LOG_INFO(options.logger, - fmt::format( - "W: {} pages, {:.4f} GB, {:.4f} GB/s", - total_pages_written, - static_cast(total_bytes_written) / DB::GB, - static_cast(total_bytes_written) / DB::GB / seconds_run)); + "W: {} pages, {:.4f} GB, {:.4f} GB/s", + total_pages_written, + static_cast(total_bytes_written) / DB::GB, + static_cast(total_bytes_written) / DB::GB / seconds_run); LOG_INFO(options.logger, - fmt::format( - "R: {} pages, {:.4f} GB, {:.4f} GB/s", - total_pages_read, - static_cast(total_bytes_read) / DB::GB, - static_cast(total_bytes_read) / DB::GB / seconds_run)); + "R: {} pages, {:.4f} GB, {:.4f} GB/s", + total_pages_read, + static_cast(total_bytes_read) / DB::GB, + static_cast(total_bytes_read) / DB::GB / seconds_run); if (options.status_interval != 0) { @@ -107,18 +136,37 @@ void StressWorkload::initPageStorage(DB::PageStorageConfig & config, String path (void)page; num_of_pages++; }); - LOG_INFO(StressEnv::logger, fmt::format("Recover {} pages.", num_of_pages)); + LOG_INFO(StressEnv::logger, "Recover {} pages.", num_of_pages); + } + + runtime_stat = std::make_unique(); +} + +void StressWorkload::initPages(const DB::PageId & max_page_id) +{ + auto writer = std::make_shared(ps, 0, runtime_stat); + for (DB::PageId page_id = 0; page_id <= max_page_id; ++page_id) + { + RandomPageId r(page_id); + writer->write(r); + if (page_id % 100 == 0) + LOG_INFO(StressEnv::logger, "writer wrote page {}", page_id); } } void StressWorkload::startBackgroundTimer() { // A background thread that do GC - gc = std::make_shared(ps); - gc->start(); + if (options.gc_interval_s > 0) + { + gc = std::make_shared(ps, options.gc_interval_s); + gc->start(); + } - // A background thread that scan all pages - scanner = std::make_shared(ps); + // A background thread that get snapshot statics, + // mock `AsynchronousMetrics` that report metrics + // to grafana. + scanner = std::make_shared(ps); scanner->start(); if (options.status_interval > 0) @@ -136,20 +184,17 @@ void StressWorkload::startBackgroundTimer() } } -void StressWorkloadManger::runWorkload() +void PageWorkloadFactory::runWorkload() { - if (options.just_init_pages || options.situation_mask == NORMAL_WORKLOAD) + if (options.situation_mask == NORMAL_WORKLOAD) { String name; WorkloadCreator func; std::tie(name, func) = get(NORMAL_WORKLOAD); - auto workload = std::shared_ptr(func(options)); - LOG_INFO(StressEnv::logger, fmt::format("Start Running {} , {}", name, workload->desc())); - workload->run(); - if (!options.just_init_pages) - { - workload->onDumpResult(); - } + running_workload = std::shared_ptr(func(options)); + LOG_INFO(StressEnv::logger, "Start Running {}, {}", name, running_workload->desc()); + running_workload->run(); + running_workload->onDumpResult(); return; } @@ -163,18 +208,19 @@ void StressWorkloadManger::runWorkload() { auto & name = it.second.first; auto & creator = it.second.second; - auto workload = creator(options); - LOG_INFO(StressEnv::logger, fmt::format("Start Running {} , {}", name, workload->desc())); - workload->run(); - if (options.verify && !workload->verify()) + running_workload = creator(options); + SCOPE_EXIT({ running_workload.reset(); }); + LOG_INFO(StressEnv::logger, "Start Running {}, {}", name, running_workload->desc()); + running_workload->run(); + if (options.verify && !running_workload->verify()) { - LOG_WARNING(StressEnv::logger, fmt::format("work load : {} failed.", name)); - workload->onFailed(); + LOG_WARNING(StressEnv::logger, "work load: {} failed.", name); + running_workload->onFailed(); break; } else { - workload->onDumpResult(); + running_workload->onDumpResult(); } } } diff --git a/dbms/src/Storages/Page/workload/PSWorkload.h b/dbms/src/Storages/Page/workload/PSWorkload.h index 0c048227a4d..35248b6188b 100644 --- a/dbms/src/Storages/Page/workload/PSWorkload.h +++ b/dbms/src/Storages/Page/workload/PSWorkload.h @@ -24,6 +24,8 @@ #include #include +#include + #define NORMAL_WORKLOAD 0 namespace DB::PS::tests { @@ -64,18 +66,32 @@ class StressWorkload virtual void onFailed() {} virtual void onDumpResult(); + void stop() + { + if (stress_time) + stress_time->stop(); + if (scanner) + scanner->stop(); + if (gc) + gc->stop(); + if (metrics_dumper) + metrics_dumper->stop(); + } + protected: void initPageStorage(DB::PageStorageConfig & config, String path_prefix = ""); void startBackgroundTimer(); + void initPages(const DB::PageId & max_page_id); + template void startWriter(size_t nums_writers, std::function)> writer_configure = nullptr) { writers.clear(); for (size_t i = 0; i < nums_writers; ++i) { - auto writer = std::make_shared(ps, i); + auto writer = std::make_shared(ps, i, runtime_stat); if (writer_configure) { writer_configure(writer); @@ -91,7 +107,7 @@ class StressWorkload readers.clear(); for (size_t i = 0; i < nums_readers; ++i) { - auto reader = std::make_shared(ps, i); + auto reader = std::make_shared(ps, i, runtime_stat); if (reader_configure) { reader_configure(reader); @@ -108,19 +124,21 @@ class StressWorkload PSPtr ps; DB::PSDiskDelegatorPtr delegator; + std::unique_ptr runtime_stat; + std::list> writers; std::list> readers; Stopwatch stop_watch; StressTimeoutPtr stress_time; - PSScannerPtr scanner; + PSSnapStatGetterPtr scanner; PSGcPtr gc; PSMetricsDumperPtr metrics_dumper; }; -class StressWorkloadManger +class PageWorkloadFactory { private: using WorkloadCreator = std::function(const StressEnv &)>; @@ -128,14 +146,14 @@ class StressWorkloadManger std::map> funcs; UInt64 registed_masks = 0; - StressWorkloadManger() = default; + PageWorkloadFactory() = default; public: - DISALLOW_COPY_AND_MOVE(StressWorkloadManger); + DISALLOW_COPY_AND_MOVE(PageWorkloadFactory); - static StressWorkloadManger & getInstance() + static PageWorkloadFactory & getInstance() { - static StressWorkloadManger instance; + static PageWorkloadFactory instance; return instance; } @@ -189,14 +207,21 @@ class StressWorkloadManger void runWorkload(); + void stopWorkload() + { + if (running_workload) + running_workload->stop(); + } + private: StressEnv options; + std::shared_ptr running_workload; }; template void work_load_register() { - StressWorkloadManger::getInstance().reg( + PageWorkloadFactory::getInstance().reg( Workload::nameFunc(), Workload::maskFunc(), [](const StressEnv & opts) -> std::shared_ptr { diff --git a/dbms/src/Storages/Page/workload/PageStorageInMemoryCapacity.h b/dbms/src/Storages/Page/workload/PageStorageInMemoryCapacity.h index 15372c4076c..632f99bcb5a 100644 --- a/dbms/src/Storages/Page/workload/PageStorageInMemoryCapacity.h +++ b/dbms/src/Storages/Page/workload/PageStorageInMemoryCapacity.h @@ -137,16 +137,13 @@ class PageStorageInMemoryCapacity : public StressWorkload DB::PageStorageConfig config; initPageStorage(config, name()); - metrics_dumper = std::make_shared(1); - metrics_dumper->start(); + startBackgroundTimer(); - stress_time = std::make_shared(100); - stress_time->start(); { stop_watch.start(); startWriter(options.num_writers, [](std::shared_ptr writer) -> void { writer->setBatchBufferNums(1); - writer->setBatchBufferSize(10 * 1024); + writer->setBufferSizeRange(10ULL * 1024, 10ULL * 1024); writer->setPageRange(single_writer_page_nums); }); @@ -161,17 +158,17 @@ class PageStorageInMemoryCapacity : public StressWorkload size_t page_writen = (single_writer_page_nums * options.num_writers); assert(page_writen != 0); - LOG_INFO(StressEnv::logger, fmt::format("After gen: {} pages" - "virtual memory used: {} MB," - "resident memory used: {} MB," - "total memory is {} , It is estimated that {} pages can be stored in the virtual memory," - "It is estimated that {} pages can be stored in the resident memory.", - page_writen, - virtual_used / DB::MB, - resident_used / DB::MB, - total_mem, - std::round(virtual_used) ? (total_mem / ((double)virtual_used / page_writen)) : 0, - std::round(resident_used) ? (total_mem / ((double)resident_used / page_writen)) : 0)); + LOG_INFO(StressEnv::logger, "After gen: {} pages" + "virtual memory used: {} MB," + "resident memory used: {} MB," + "total memory is {} , It is estimated that {} pages can be stored in the virtual memory," + "It is estimated that {} pages can be stored in the resident memory.", + page_writen, + virtual_used / DB::MB, + resident_used / DB::MB, + total_mem, + std::round(virtual_used) ? (total_mem / ((double)virtual_used / page_writen)) : 0, + std::round(resident_used) ? (total_mem / ((double)resident_used / page_writen)) : 0); } }; } // namespace DB::PS::tests diff --git a/dbms/src/Storages/Page/workload/ThousandsOfOffset.h b/dbms/src/Storages/Page/workload/ThousandsOfOffset.h index 4bb7a3042c8..5c4d9cf5cc0 100644 --- a/dbms/src/Storages/Page/workload/ThousandsOfOffset.h +++ b/dbms/src/Storages/Page/workload/ThousandsOfOffset.h @@ -88,8 +88,7 @@ class ThousandsOfOffset : public StressWorkload startWriter(options.num_writers, [field_size, buffer_size](std::shared_ptr writer) -> void { writer->setFieldSize(field_size); writer->setBatchBufferNums(1); - writer->setBatchBufferSize(buffer_size); - writer->setWindowSize(500); + writer->setBufferSizeRange(buffer_size, buffer_size); writer->setNormalDistributionSigma(13); }); @@ -110,8 +109,7 @@ class ThousandsOfOffset : public StressWorkload startWriter(options.num_writers, [field_size, buffer_size](std::shared_ptr writer) -> void { writer->setFieldSize(field_size); writer->setBatchBufferNums(20); - writer->setBatchBufferSize(buffer_size); - writer->setWindowSize(500); + writer->setBufferSizeRange(buffer_size, buffer_size); writer->setNormalDistributionSigma(13); }); @@ -132,8 +130,7 @@ class ThousandsOfOffset : public StressWorkload startWriter(options.num_writers, [field_size, buffer_size](std::shared_ptr writer) -> void { writer->setFieldSize(field_size); writer->setBatchBufferNums(1); - writer->setBatchBufferSize(buffer_size); - writer->setWindowSize(500); + writer->setBufferSizeRange(buffer_size, buffer_size); writer->setNormalDistributionSigma(13); }); @@ -154,8 +151,7 @@ class ThousandsOfOffset : public StressWorkload startWriter(options.num_writers, [field_size, buffer_size](std::shared_ptr writer) -> void { writer->setFieldSize(field_size); writer->setBatchBufferNums(20); - writer->setBatchBufferSize(buffer_size); - writer->setWindowSize(500); + writer->setBufferSizeRange(buffer_size, buffer_size); writer->setNormalDistributionSigma(13); });