From 01240cf0653910fdbacafa5668b8f429ec894f84 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Mon, 24 Oct 2022 14:10:16 +0800 Subject: [PATCH 01/12] Setup the configs --- .../Page/workload/HeavyMemoryCostInGC.h | 8 ++-- .../Page/workload/HeavySkewWriteRead.h | 12 ++---- .../Page/workload/HighValidBigFileGC.h | 4 +- .../Page/workload/HoldSnapshotsLongTime.h | 11 +---- dbms/src/Storages/Page/workload/Normal.h | 4 +- .../Storages/Page/workload/PSBackground.cpp | 6 +-- .../src/Storages/Page/workload/PSBackground.h | 12 +++--- .../src/Storages/Page/workload/PSRunnable.cpp | 6 +-- .../Storages/Page/workload/PSStressEnv.cpp | 13 ++++-- dbms/src/Storages/Page/workload/PSStressEnv.h | 19 +++++---- .../src/Storages/Page/workload/PSWorkload.cpp | 41 ++++++++++--------- dbms/src/Storages/Page/workload/PSWorkload.h | 2 +- 12 files changed, 70 insertions(+), 68 deletions(-) diff --git a/dbms/src/Storages/Page/workload/HeavyMemoryCostInGC.h b/dbms/src/Storages/Page/workload/HeavyMemoryCostInGC.h index 9bd2fd44a39..e8d66cbad88 100644 --- a/dbms/src/Storages/Page/workload/HeavyMemoryCostInGC.h +++ b/dbms/src/Storages/Page/workload/HeavyMemoryCostInGC.h @@ -55,7 +55,7 @@ class HeavyMemoryCostInGC metrics_dumper = std::make_shared(1); metrics_dumper->start(); - stress_time = std::make_shared(30); + stress_time = std::make_shared(options.timeout_s); stress_time->start(); startWriter(options.num_writers, [](std::shared_ptr writer) -> void { @@ -66,7 +66,7 @@ class HeavyMemoryCostInGC pool.joinAll(); stop_watch.stop(); - gc = std::make_shared(ps); + gc = std::make_shared(ps, options.gc_interval_s); gc->doGcOnce(); } @@ -78,7 +78,9 @@ class HeavyMemoryCostInGC void onFailed() override { LOG_WARNING(StressEnv::logger, - fmt::format("Memory Peak is {} , it should not bigger than {} ", metrics_dumper->getMemoryPeak(), 5 * 1024 * 1024)); + "Memory Peak is {}, it should not bigger than {}", + metrics_dumper->getMemoryPeak(), + 5 * 1024 * 1024); } }; } // namespace DB::PS::tests diff --git a/dbms/src/Storages/Page/workload/HeavySkewWriteRead.h b/dbms/src/Storages/Page/workload/HeavySkewWriteRead.h index 71f0df43af9..3579993596b 100644 --- a/dbms/src/Storages/Page/workload/HeavySkewWriteRead.h +++ b/dbms/src/Storages/Page/workload/HeavySkewWriteRead.h @@ -51,22 +51,18 @@ class HeavySkewWriteRead : public StressWorkload DB::PageStorageConfig config; initPageStorage(config, name()); - metrics_dumper = std::make_shared(1); - metrics_dumper->start(); + startBackgroundTimer(); - stress_time = std::make_shared(60); - stress_time->start(); { stop_watch.start(); - startWriter(options.num_writers, [](std::shared_ptr writer) -> void { + const auto num_writers = options.num_writers; + startWriter(num_writers, [&](std::shared_ptr writer) -> void { writer->setBatchBufferNums(1); - writer->setBatchBufferRange(10 * 1024, 1 * DB::MB); + writer->setBatchBufferRange(0, options.avg_page_size * 2); writer->setWindowSize(500); writer->setNormalDistributionSigma(13); }); - auto num_writers = options.num_writers; - startReader(options.num_readers, [num_writers](std::shared_ptr reader) -> void { reader->setPageReadOnce(5); reader->setReadDelay(0); diff --git a/dbms/src/Storages/Page/workload/HighValidBigFileGC.h b/dbms/src/Storages/Page/workload/HighValidBigFileGC.h index 87405b0ef3f..cbccd0aa58a 100644 --- a/dbms/src/Storages/Page/workload/HighValidBigFileGC.h +++ b/dbms/src/Storages/Page/workload/HighValidBigFileGC.h @@ -96,7 +96,7 @@ class HighValidBigFileGCWorkload onDumpResult(); } - gc = std::make_shared(ps); + gc = std::make_shared(ps, options.gc_interval_s); gc->doGcOnce(); gc_time_ms = gc->getElapsedMilliseconds(); { @@ -123,7 +123,7 @@ class HighValidBigFileGCWorkload void onFailed() override { - LOG_WARNING(StressEnv::logger, fmt::format("GC time is {} , it should not bigger than {} ", gc_time_ms, 1 * 1000)); + LOG_WARNING(StressEnv::logger, "GC time is {} , it should not bigger than {} ", gc_time_ms, 1 * 1000); } private: diff --git a/dbms/src/Storages/Page/workload/HoldSnapshotsLongTime.h b/dbms/src/Storages/Page/workload/HoldSnapshotsLongTime.h index 81816f72375..9f032e34c94 100644 --- a/dbms/src/Storages/Page/workload/HoldSnapshotsLongTime.h +++ b/dbms/src/Storages/Page/workload/HoldSnapshotsLongTime.h @@ -52,14 +52,7 @@ class HoldSnapshotsLongTime : public StressWorkload DB::PageStorageConfig config; initPageStorage(config, name()); - metrics_dumper = std::make_shared(1); - metrics_dumper->start(); - - stress_time = std::make_shared(60); - stress_time->start(); - - scanner = std::make_shared(ps); - scanner->start(); + startBackgroundTimer(); // 90-100 snapshots will be generated. { @@ -79,7 +72,7 @@ class HoldSnapshotsLongTime : public StressWorkload stop_watch.stop(); } - gc = std::make_shared(ps); + gc = std::make_shared(ps, options.gc_interval_s); // Normal GC gc->doGcOnce(); diff --git a/dbms/src/Storages/Page/workload/Normal.h b/dbms/src/Storages/Page/workload/Normal.h index 033f226bf89..a49c3f27c95 100644 --- a/dbms/src/Storages/Page/workload/Normal.h +++ b/dbms/src/Storages/Page/workload/Normal.h @@ -48,9 +48,9 @@ class NormalWorkload config.num_write_slots = options.num_writer_slots; initPageStorage(config); - if (options.avg_page_size_mb != 0) + if (options.avg_page_size != 0) { - PSWriter::setApproxPageSize(options.avg_page_size_mb); + PSWriter::setApproxPageSize(options.avg_page_size); } // init all pages in PageStorage diff --git a/dbms/src/Storages/Page/workload/PSBackground.cpp b/dbms/src/Storages/Page/workload/PSBackground.cpp index 7cfc335c3a9..b4e31f5ef9f 100644 --- a/dbms/src/Storages/Page/workload/PSBackground.cpp +++ b/dbms/src/Storages/Page/workload/PSBackground.cpp @@ -71,7 +71,7 @@ void PSGc::start() gc_timer.start(Poco::TimerCallback(*this, &PSGc::onTime)); } -void PSScanner::onTime(Poco::Timer & /*timer*/) +void PSSnapStatGetter::onTime(Poco::Timer & /*timer*/) { try { @@ -94,9 +94,9 @@ void PSScanner::onTime(Poco::Timer & /*timer*/) } } -void PSScanner::start() +void PSSnapStatGetter::start() { - scanner_timer.start(Poco::TimerCallback(*this, &PSScanner::onTime)); + scanner_timer.start(Poco::TimerCallback(*this, &PSSnapStatGetter::onTime)); } // NOLINTNEXTLINE(readability-convert-member-functions-to-static) diff --git a/dbms/src/Storages/Page/workload/PSBackground.h b/dbms/src/Storages/Page/workload/PSBackground.h index c91dad1361f..edc3bf03f67 100644 --- a/dbms/src/Storages/Page/workload/PSBackground.h +++ b/dbms/src/Storages/Page/workload/PSBackground.h @@ -99,12 +99,12 @@ class PSGc PSPtr ps; public: - explicit PSGc(const PSPtr & ps_) + explicit PSGc(const PSPtr & ps_, uint64_t interval) : ps(ps_) { assert(ps != nullptr); gc_timer.setStartInterval(1000); - gc_timer.setPeriodicInterval(30 * 1000); + gc_timer.setPeriodicInterval(interval * 1000); } void doGcOnce(); @@ -124,12 +124,12 @@ class PSGc }; using PSGcPtr = std::shared_ptr; -class PSScanner +class PSSnapStatGetter { PSPtr ps; public: - explicit PSScanner(const PSPtr & ps_) + explicit PSSnapStatGetter(const PSPtr & ps_) : ps(ps_) { assert(ps != nullptr); @@ -145,7 +145,7 @@ class PSScanner private: Poco::Timer scanner_timer; }; -using PSScannerPtr = std::shared_ptr; +using PSSnapStatGetterPtr = std::shared_ptr; class StressTimeout { @@ -153,7 +153,7 @@ class StressTimeout explicit StressTimeout(size_t timeout_s) { StressEnvStatus::getInstance().setStat(STATUS_LOOP); - LOG_INFO(StressEnv::logger, fmt::format("Timeout: {}s", timeout_s)); + LOG_INFO(StressEnv::logger, "Timeout: {}s", timeout_s); timeout_timer.setStartInterval(timeout_s * 1000); } diff --git a/dbms/src/Storages/Page/workload/PSRunnable.cpp b/dbms/src/Storages/Page/workload/PSRunnable.cpp index 01fd89c0377..db37d74e144 100644 --- a/dbms/src/Storages/Page/workload/PSRunnable.cpp +++ b/dbms/src/Storages/Page/workload/PSRunnable.cpp @@ -57,10 +57,10 @@ size_t PSRunnable::getPagesUsed() const } size_t PSWriter::approx_page_mb = 2; -void PSWriter::setApproxPageSize(size_t size_mb) +void PSWriter::setApproxPageSize(size_t size) { - LOG_INFO(StressEnv::logger, "Page approx size is set to {} MB", size_mb); - approx_page_mb = size_mb; + LOG_INFO(StressEnv::logger, "Page approx size is set to {} MB", formatReadableSizeWithBinarySuffix(size)); + approx_page_mb = size * 1024 * 1024; } DB::ReadBufferPtr PSWriter::genRandomData(const DB::PageId pageId, DB::MemHolder & holder) diff --git a/dbms/src/Storages/Page/workload/PSStressEnv.cpp b/dbms/src/Storages/Page/workload/PSStressEnv.cpp index 288c8b13c2c..82580954802 100644 --- a/dbms/src/Storages/Page/workload/PSStressEnv.cpp +++ b/dbms/src/Storages/Page/workload/PSStressEnv.cpp @@ -27,6 +27,7 @@ #include #include +#include "Common/Exception.h" namespace DB::PS::tests { @@ -55,10 +56,11 @@ StressEnv StressEnv::parse(int argc, char ** argv) ("timeout,T", value()->default_value(600), "maximum run time (seconds). 0 means run infinitely") // ("writer_slots", value()->default_value(4), "number of PageStorage writer slots") // ("read_delay_ms", value()->default_value(0), "millionseconds of read delay") // - ("avg_page_size", value()->default_value(1), "avg size for each page(MiB)") // + ("avg_page_size", value()->default_value(1 * 1024 * 1024), "avg size for each page(bytes). 1MiB by default") // ("paths,P", value>(), "store path(s)") // - ("failpoints,F", value>(), "failpoint(s) to enable") // - ("status_interval,S", value()->default_value(1), "Status statistics interval. 0 means no statistics") // + ("failpoints", value>(), "failpoint(s) to enable") // + ("gc_interval", value()->default_value(30), "GC interval(seconds). 0 means no gc") // + ("status_interval", value()->default_value(1), "Status statistics interval(seconds). 0 means no statistics") // ("situation_mask,M", value()->default_value(0), "Run special tests sequentially, example -M 2") // ("verify", value()->default_value(true), "Run special tests sequentially with verify.") // ("running_ps_version,V", value()->default_value(3), "Select a version of PageStorage. 2 or 3 can used"); @@ -83,7 +85,8 @@ StressEnv StressEnv::parse(int argc, char ** argv) opt.timeout_s = options["timeout"].as(); opt.read_delay_ms = options["read_delay_ms"].as(); opt.num_writer_slots = options["writer_slots"].as(); - opt.avg_page_size_mb = options["avg_page_size"].as(); + opt.avg_page_size = options["avg_page_size"].as(); + opt.gc_interval_s = options["gc_interval"].as(); opt.status_interval = options["status_interval"].as(); opt.situation_mask = options["situation_mask"].as(); opt.verify = options["verify"].as(); @@ -96,6 +99,8 @@ StressEnv StressEnv::parse(int argc, char ** argv) exit(0); } + RUNTIME_CHECK(opt.avg_page_size > 0); + if (options.count("paths")) opt.paths = options["paths"].as>(); else diff --git a/dbms/src/Storages/Page/workload/PSStressEnv.h b/dbms/src/Storages/Page/workload/PSStressEnv.h index e67cb325430..c015d23f6ea 100644 --- a/dbms/src/Storages/Page/workload/PSStressEnv.h +++ b/dbms/src/Storages/Page/workload/PSStressEnv.h @@ -79,10 +79,11 @@ struct StressEnv bool init_pages = false; bool just_init_pages = false; bool clean_before_run = false; + size_t gc_interval_s = 30; size_t timeout_s = 0; size_t read_delay_ms = 0; size_t num_writer_slots = 1; - size_t avg_page_size_mb = 1; + size_t avg_page_size = 1024 * 1024; size_t status_interval = 1; size_t situation_mask = 0; bool verify = true; @@ -97,9 +98,11 @@ struct StressEnv "{{ " "num_writers: {}, num_readers: {}, init_pages: {}, just_init_pages: {}" ", clean_before_run: {}, timeout_s: {}, read_delay_ms: {}, num_writer_slots: {}" - ", avg_page_size_mb: {}, paths: [{}], failpoints: [{}]" - ", status_interval: {}, situation_mask: {}, verify: {}" - ", running_pagestorage_version : {}." + ", avg_page_size: {}, paths: [{}], failpoints: [{}]" + ", gc_interval_s: {}" + ", status_interval: {}, verify: {}" + ", situation_mask: {}" + ", running_pagestorage_version: {}" "}}", num_writers, num_readers, @@ -109,14 +112,14 @@ struct StressEnv timeout_s, read_delay_ms, num_writer_slots, - avg_page_size_mb, + avg_page_size, fmt::join(paths.begin(), paths.end(), ","), fmt::join(failpoints.begin(), failpoints.end(), ","), + gc_interval_s, status_interval, - situation_mask, verify, - running_ps_version - // + situation_mask, + running_ps_version // ); } diff --git a/dbms/src/Storages/Page/workload/PSWorkload.cpp b/dbms/src/Storages/Page/workload/PSWorkload.cpp index f9ff0b40c08..3d0bfe0310f 100644 --- a/dbms/src/Storages/Page/workload/PSWorkload.cpp +++ b/dbms/src/Storages/Page/workload/PSWorkload.cpp @@ -25,7 +25,7 @@ namespace DB::PS::tests void StressWorkload::onDumpResult() { UInt64 time_interval = stop_watch.elapsedMilliseconds(); - LOG_INFO(options.logger, fmt::format("result in {}ms", time_interval)); + LOG_INFO(options.logger, "result in {}ms", time_interval); double seconds_run = 1.0 * time_interval / 1000; size_t total_pages_written = 0; @@ -47,17 +47,15 @@ void StressWorkload::onDumpResult() } LOG_INFO(options.logger, - fmt::format( - "W: {} pages, {:.4f} GB, {:.4f} GB/s", - total_pages_written, - static_cast(total_bytes_written) / DB::GB, - static_cast(total_bytes_written) / DB::GB / seconds_run)); + "W: {} pages, {:.4f} GB, {:.4f} GB/s", + total_pages_written, + static_cast(total_bytes_written) / DB::GB, + static_cast(total_bytes_written) / DB::GB / seconds_run); LOG_INFO(options.logger, - fmt::format( - "R: {} pages, {:.4f} GB, {:.4f} GB/s", - total_pages_read, - static_cast(total_bytes_read) / DB::GB, - static_cast(total_bytes_read) / DB::GB / seconds_run)); + "R: {} pages, {:.4f} GB, {:.4f} GB/s", + total_pages_read, + static_cast(total_bytes_read) / DB::GB, + static_cast(total_bytes_read) / DB::GB / seconds_run); if (options.status_interval != 0) { @@ -107,18 +105,23 @@ void StressWorkload::initPageStorage(DB::PageStorageConfig & config, String path (void)page; num_of_pages++; }); - LOG_INFO(StressEnv::logger, fmt::format("Recover {} pages.", num_of_pages)); + LOG_INFO(StressEnv::logger, "Recover {} pages.", num_of_pages); } } void StressWorkload::startBackgroundTimer() { // A background thread that do GC - gc = std::make_shared(ps); - gc->start(); + if (options.gc_interval_s > 0) + { + gc = std::make_shared(ps, options.gc_interval_s); + gc->start(); + } - // A background thread that scan all pages - scanner = std::make_shared(ps); + // A background thread that get snapshot statics, + // mock `AsynchronousMetrics` that report metrics + // to grafana. + scanner = std::make_shared(ps); scanner->start(); if (options.status_interval > 0) @@ -144,7 +147,7 @@ void StressWorkloadManger::runWorkload() WorkloadCreator func; std::tie(name, func) = get(NORMAL_WORKLOAD); auto workload = std::shared_ptr(func(options)); - LOG_INFO(StressEnv::logger, fmt::format("Start Running {} , {}", name, workload->desc())); + LOG_INFO(StressEnv::logger, "Start Running {} , {}", name, workload->desc()); workload->run(); if (!options.just_init_pages) { @@ -164,11 +167,11 @@ void StressWorkloadManger::runWorkload() auto & name = it.second.first; auto & creator = it.second.second; auto workload = creator(options); - LOG_INFO(StressEnv::logger, fmt::format("Start Running {} , {}", name, workload->desc())); + LOG_INFO(StressEnv::logger, "Start Running {} , {}", name, workload->desc()); workload->run(); if (options.verify && !workload->verify()) { - LOG_WARNING(StressEnv::logger, fmt::format("work load : {} failed.", name)); + LOG_WARNING(StressEnv::logger, "work load : {} failed.", name); workload->onFailed(); break; } diff --git a/dbms/src/Storages/Page/workload/PSWorkload.h b/dbms/src/Storages/Page/workload/PSWorkload.h index 0c048227a4d..8c2bc85ef75 100644 --- a/dbms/src/Storages/Page/workload/PSWorkload.h +++ b/dbms/src/Storages/Page/workload/PSWorkload.h @@ -114,7 +114,7 @@ class StressWorkload Stopwatch stop_watch; StressTimeoutPtr stress_time; - PSScannerPtr scanner; + PSSnapStatGetterPtr scanner; PSGcPtr gc; PSMetricsDumperPtr metrics_dumper; }; From 4aac9412f3e9c13f78e9ca120dbd960b6e22972a Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Mon, 24 Oct 2022 21:23:56 +0800 Subject: [PATCH 02/12] save --- .../Page/workload/HeavySkewWriteRead.h | 6 +- .../Page/workload/HoldSnapshotsLongTime.h | 1 - dbms/src/Storages/Page/workload/MainEntry.cpp | 4 +- .../src/Storages/Page/workload/PSBackground.h | 19 ++ .../src/Storages/Page/workload/PSRunnable.cpp | 195 ++++++++++-------- dbms/src/Storages/Page/workload/PSRunnable.h | 79 ++++--- .../Storages/Page/workload/PSStressEnv.cpp | 7 +- .../src/Storages/Page/workload/PSWorkload.cpp | 29 +-- dbms/src/Storages/Page/workload/PSWorkload.h | 39 +++- .../Page/workload/ThousandsOfOffset.h | 4 - 10 files changed, 235 insertions(+), 148 deletions(-) diff --git a/dbms/src/Storages/Page/workload/HeavySkewWriteRead.h b/dbms/src/Storages/Page/workload/HeavySkewWriteRead.h index 3579993596b..fbefd7b02f6 100644 --- a/dbms/src/Storages/Page/workload/HeavySkewWriteRead.h +++ b/dbms/src/Storages/Page/workload/HeavySkewWriteRead.h @@ -59,16 +59,14 @@ class HeavySkewWriteRead : public StressWorkload startWriter(num_writers, [&](std::shared_ptr writer) -> void { writer->setBatchBufferNums(1); writer->setBatchBufferRange(0, options.avg_page_size * 2); - writer->setWindowSize(500); - writer->setNormalDistributionSigma(13); + writer->setNormalDistributionSigma(250); }); startReader(options.num_readers, [num_writers](std::shared_ptr reader) -> void { reader->setPageReadOnce(5); reader->setReadDelay(0); reader->setWriterNums(num_writers); - reader->setWindowSize(100); - reader->setNormalDistributionSigma(9); + reader->setNormalDistributionSigma(250); }); pool.joinAll(); diff --git a/dbms/src/Storages/Page/workload/HoldSnapshotsLongTime.h b/dbms/src/Storages/Page/workload/HoldSnapshotsLongTime.h index 9f032e34c94..e4789271d75 100644 --- a/dbms/src/Storages/Page/workload/HoldSnapshotsLongTime.h +++ b/dbms/src/Storages/Page/workload/HoldSnapshotsLongTime.h @@ -60,7 +60,6 @@ class HoldSnapshotsLongTime : public StressWorkload startWriter(options.num_writers, [](std::shared_ptr writer) -> void { writer->setBatchBufferNums(1); writer->setBatchBufferRange(10 * 1024, 1 * DB::MB); - writer->setWindowSize(500); writer->setNormalDistributionSigma(13); }); diff --git a/dbms/src/Storages/Page/workload/MainEntry.cpp b/dbms/src/Storages/Page/workload/MainEntry.cpp index 18e42106c90..19be50e4924 100644 --- a/dbms/src/Storages/Page/workload/MainEntry.cpp +++ b/dbms/src/Storages/Page/workload/MainEntry.cpp @@ -44,7 +44,7 @@ int StressWorkload::mainEntry(int argc, char ** argv) auto env = StressEnv::parse(argc, argv); env.setup(); - auto & mamager = StressWorkloadManger::getInstance(); + auto & mamager = PageWorkloadFactory::getInstance(); mamager.setEnv(env); mamager.runWorkload(); @@ -55,4 +55,4 @@ int StressWorkload::mainEntry(int argc, char ** argv) DB::tryLogCurrentException(""); exit(-1); } -} \ No newline at end of file +} diff --git a/dbms/src/Storages/Page/workload/PSBackground.h b/dbms/src/Storages/Page/workload/PSBackground.h index edc3bf03f67..5474c8cf013 100644 --- a/dbms/src/Storages/Page/workload/PSBackground.h +++ b/dbms/src/Storages/Page/workload/PSBackground.h @@ -58,6 +58,11 @@ class PSMetricsDumper void start(); + void stop() + { + timer_status.stop(); + } + UInt32 getMemoryPeak() const { auto info = metrics.find(CurrentMetrics::MemoryTracking); @@ -113,6 +118,11 @@ class PSGc void start(); + void stop() + { + gc_timer.stop(); + } + UInt64 getElapsedMilliseconds() { return gc_stop_watch.elapsedMilliseconds(); @@ -142,6 +152,11 @@ class PSSnapStatGetter void start(); + void stop() + { + scanner_timer.stop(); + } + private: Poco::Timer scanner_timer; }; @@ -159,6 +174,10 @@ class StressTimeout void onTime(Poco::Timer & timer); void start(); + void stop() + { + timeout_timer.stop(); + } private: Poco::Timer timeout_timer; diff --git a/dbms/src/Storages/Page/workload/PSRunnable.cpp b/dbms/src/Storages/Page/workload/PSRunnable.cpp index db37d74e144..b2c934e2844 100644 --- a/dbms/src/Storages/Page/workload/PSRunnable.cpp +++ b/dbms/src/Storages/Page/workload/PSRunnable.cpp @@ -12,14 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include #include #include #include +#include #include +#include #include +#include #include #include @@ -43,6 +47,8 @@ try } catch (...) { + // stop the whole testing + StressEnvStatus::getInstance().setStat(StressEnvStat::STATUS_EXCEPTION); DB::tryLogCurrentException(StressEnv::logger); } @@ -123,21 +129,26 @@ void PSWriter::fillAllPages(const PSPtr & ps) bool PSWriter::runImpl() { - const DB::PageId page_id = genRandomPageId(); + const auto r = genRandomPageId(); updatedRandomData(); DB::WriteBatch wb{DB::TEST_NAMESPACE_ID}; - wb.putPage(page_id, 0, buff_ptr, buff_ptr->buffer().size()); + wb.putPage(r.page_id, 0, buff_ptr, buff_ptr->buffer().size()); + for (const auto id : r.page_id_to_remove) + wb.delPage(id); ps->write(std::move(wb)); ++pages_used; bytes_used += buff_ptr->buffer().size(); + + // verbose logging for debug + // LOG_TRACE(StressEnv::logger, "write done, page_id={}, remove={}", r.page_id, r.page_id_to_remove); return true; } -DB::PageId PSWriter::genRandomPageId() +RandomPageId PSWriter::genRandomPageId() { std::normal_distribution<> distribution{static_cast(max_page_id) / 2, 150}; - return static_cast(std::round(distribution(gen))) % max_page_id; + return RandomPageId(static_cast(std::round(distribution(gen))) % max_page_id); } void PSCommonWriter::updatedRandomData() @@ -170,23 +181,26 @@ void PSCommonWriter::updatedRandomData() } } -DB::PageId writing_page[1000]; - bool PSCommonWriter::runImpl() { - const DB::PageId page_id = genRandomPageId(); + const auto r = genRandomPageId(); DB::WriteBatch wb{DB::TEST_NAMESPACE_ID}; updatedRandomData(); + // FIXME: update one page_id by multiple data in one write batch? for (auto & buffptr : buff_ptrs) { - wb.putPage(page_id, 0, buffptr, buffptr->buffer().size()); + wb.putPage(r.page_id, 0, buffptr, buffptr->buffer().size()); ++pages_used; bytes_used += buffptr->buffer().size(); } + for (const auto & page_id : r.page_id_to_remove) + wb.delPage(page_id); ps->write(std::move(wb)); + // verbose logging for debug + // LOG_DEBUG(StressEnv::logger, "write done, page_id={}, remove={}", r.page_id, r.page_id_to_remove); return (batch_buffer_limit == 0 || bytes_used < batch_buffer_limit); } @@ -210,19 +224,17 @@ void PSCommonWriter::setBatchBufferPageRange(size_t max_page_id_) max_page_id = max_page_id_; } -DB::PageId PSCommonWriter::genRandomPageId() +RandomPageId PSCommonWriter::genRandomPageId() { std::uniform_int_distribution<> dist(0, max_page_id); - return static_cast(dist(gen)); + return RandomPageId(static_cast(dist(gen))); } void PSCommonWriter::setBatchBufferRange(size_t min, size_t max) { - if (max > min && min > 0) - { - buffer_size_min = min; - buffer_size_max = max; - } + RUNTIME_CHECK(max >= min); + buffer_size_min = std::max(1, min); + buffer_size_max = max; } void PSCommonWriter::setFieldSize(const DB::PageFieldSizes & data_sizes_) @@ -245,7 +257,7 @@ size_t PSCommonWriter::genBufferSize() DB::PageIds PSReader::genRandomPageIds() { DB::PageIds page_ids; - for (size_t i = 0; i < page_read_once; ++i) + for (size_t i = 0; i < num_pages_read; ++i) { std::uniform_int_distribution<> dist(0, max_page_id); page_ids.emplace_back(static_cast(dist(gen))); @@ -256,6 +268,8 @@ DB::PageIds PSReader::genRandomPageIds() bool PSReader::runImpl() { DB::PageIds page_ids = genRandomPageIds(); + if (page_ids.empty()) + return true; auto page_map = ps->read(DB::TEST_NAMESPACE_ID, page_ids); for (const auto & page : page_map) @@ -272,7 +286,7 @@ bool PSReader::runImpl() void PSReader::setPageReadOnce(size_t page_read_once_) { - page_read_once = page_read_once_; + num_pages_read = page_read_once_; } void PSReader::setReadDelay(size_t delay_ms) @@ -287,12 +301,7 @@ void PSReader::setReadPageRange(size_t max_page_id_) void PSReader::setReadPageNums(size_t page_read_once_) { - page_read_once = page_read_once_; -} - -void PSWindowWriter::setWindowSize(size_t window_size_) -{ - window_size = window_size_; + num_pages_read = page_read_once_; } void PSWindowWriter::setNormalDistributionSigma(size_t sigma_) @@ -300,32 +309,49 @@ void PSWindowWriter::setNormalDistributionSigma(size_t sigma_) sigma = sigma_; } -UInt64 pageid_boundary = 0; -std::mutex page_id_mutex; - -DB::PageId PSWindowWriter::genRandomPageId() +RandomPageId PSWindowWriter::genRandomPageId() { - std::lock_guard page_id_lock(page_id_mutex); - if (pageid_boundary < (window_size / 2)) - { - writing_page[index] = pageid_boundary++; - return static_cast(writing_page[index]); - } - - // Generate a random number in the window - std::normal_distribution<> distribution{static_cast(window_size), static_cast(sigma)}; - auto random = std::round(distribution(gen)); - // Move this "random" near the pageid_boundary, If "random" is still negative, then make it positive - random = std::abs(random + pageid_boundary); + std::lock_guard page_id_lock(global_stat->mtx_page_id); + DB::PageIdSet ids_to_del; + DB::PageId page_id = [this, &ids_to_del]() { + if (global_stat->right_id_boundary < 4 * sigma) + { + return global_stat->right_id_boundary++; + } - auto page_id = static_cast(random > pageid_boundary ? pageid_boundary++ : random); - writing_page[index] = page_id; - return page_id; -} + // Generate a random number in the window, normal dist by μ=0 and σ=sigma + std::normal_distribution distribution{0.0, static_cast(sigma)}; + auto random = std::round(distribution(gen)); + // 100 - (100 - 68)/2 == 84% probability that update the existing page id + if (random <= sigma) + { + // Move this "random" near the right boundary - σ, (mock a hot write in an id range) + // we will update the data in this page_id + DB::PageId page_id = std::abs(global_stat->right_id_boundary - sigma + random); + return std::max(page_id, global_stat->left_id_boundary.load()); + } -void PSWindowReader::setWindowSize(size_t window_size_) -{ - window_size = window_size_; + // Else it is about 16% probability that we create a new page. + // Also we consider the pages with id less than (right boundary - 4σ) have no chance (less than 0.01% + // by the definition of normal distribution) for being read later, remove the pages. + DB::PageId left_boundary = 0; + if (global_stat->right_id_boundary > 3 * sigma) // ensure the new left boundary is not negative + left_boundary = global_stat->right_id_boundary - 3 * sigma; + global_stat->left_id_boundary = left_boundary; + + // Remove the page id that is not likely update/read any more + const auto old_removed_id = global_stat->last_removed_page_id; + for (size_t i = old_removed_id; i < left_boundary; ++i) + ids_to_del.insert(i); + global_stat->last_removed_page_id = left_boundary; + + auto page_id = global_stat->right_id_boundary++; + if (page_id % 200 == 0) + LOG_INFO(StressEnv::logger, "Remove old id range [{}, {}), update boundary to [{}, {})", old_removed_id, left_boundary, left_boundary, global_stat->right_id_boundary); + return page_id; + }(); + global_stat->writing_page[index] = page_id; + return RandomPageId(page_id, ids_to_del); } void PSWindowReader::setNormalDistributionSigma(size_t sigma_) @@ -340,50 +366,43 @@ void PSWindowReader::setWriterNums(size_t writer_nums_) DB::PageIds PSWindowReader::genRandomPageIds() { - std::vector page_ids; + const auto page_id_boundary_copy = global_stat->right_id_boundary.load(); + // Nothing to read + if (page_id_boundary_copy < (writer_nums + num_pages_read)) + return {}; + if (global_stat->left_id_boundary.load() == 0) + return {}; + + const size_t read_right_boundary = page_id_boundary_copy - writer_nums - num_pages_read; + + // Generate a random number in the window, normal dist by μ=0 and σ=sigma + std::normal_distribution<> distribution{0.0, static_cast(sigma)}; + double r = distribution(gen); + // id > (right boundary+σ) is likely not written, turn the `r` into the left side of boundary + // for reading + if (r > sigma) + r = -r; + double rand_id = std::round(read_right_boundary - sigma + r); // the rand_id is double since it could be < 0.0 + // Limit by boundary + rand_id = std::max(rand_id, global_stat->left_id_boundary.load()); + rand_id = std::min(rand_id, read_right_boundary); - if (pageid_boundary <= (writer_nums + page_read_once)) - { - // Nothing to read - return page_ids; - } - - size_t read_boundary = pageid_boundary - writer_nums - page_read_once; - if (read_boundary < window_size) - { - return page_ids; - } - - std::normal_distribution<> distribution{static_cast(window_size), - static_cast(sigma)}; - auto rand_id = std::round(distribution(gen)); - - rand_id = read_boundary - window_size + rand_id; - - // Bigger than window right boundary - if (rand_id > read_boundary) - { - rand_id = read_boundary; - } - - // Smaller than window left boundary - if (rand_id < 0) - { - rand_id = std::abs(rand_id); - } - - for (size_t i = rand_id; i < page_read_once + rand_id; ++i) + DB::PageIds page_ids; + for (size_t id = rand_id; id < num_pages_read + rand_id; ++id) { - bool writing = false; - for (size_t j = 0; j < writer_nums; j++) - { - if (i == writing_page[j]) + bool is_being_write = [this](DB::PageId page_id) { + std::lock_guard page_id_lock(global_stat->mtx_page_id); + for (size_t j = 0; j < writer_nums; j++) { - writing = true; + if (page_id == global_stat->writing_page[j]) + { + return true; + } } - } - if (!writing) - page_ids.emplace_back(i); + return false; + }(id); + if (!is_being_write) + page_ids.emplace_back(id); } return page_ids; @@ -412,8 +431,8 @@ void PSIncreaseWriter::setPageRange(size_t page_range) end_page_id = (index + 1) * page_range + 1; } -DB::PageId PSIncreaseWriter::genRandomPageId() +RandomPageId PSIncreaseWriter::genRandomPageId() { - return static_cast(begin_page_id++); + return RandomPageId(begin_page_id++); } } // namespace DB::PS::tests diff --git a/dbms/src/Storages/Page/workload/PSRunnable.h b/dbms/src/Storages/Page/workload/PSRunnable.h index b723236391d..79d3f91af73 100644 --- a/dbms/src/Storages/Page/workload/PSRunnable.h +++ b/dbms/src/Storages/Page/workload/PSRunnable.h @@ -36,14 +36,43 @@ class PSRunnable : public Poco::Runnable size_t pages_used = 0; }; +struct RandomPageId +{ + DB::PageId page_id; + DB::PageIdSet page_id_to_remove; + + explicit RandomPageId(DB::PageId new_page_id) + : page_id(new_page_id) + {} + + RandomPageId(DB::PageId new_page_id, DB::PageIdSet page_id_to_remove_) + : page_id(new_page_id) + , page_id_to_remove(page_id_to_remove_) + { + } +}; + +struct GlobalStat +{ + // shared status between PSWindowWriter and PSWindowReader + std::mutex mtx_page_id; + // The page ids between [left_id_boundary, right_id_boundary) + // and not exists in `writing_page` is readable + std::atomic right_id_boundary = 0; + std::atomic left_id_boundary = 0; + DB::PageId last_removed_page_id = 0; + DB::PageId writing_page[1000]; +}; + class PSWriter : public PSRunnable { static size_t approx_page_mb; public: - PSWriter(const PSPtr & ps_, DB::UInt32 index_) + PSWriter(const PSPtr & ps_, DB::UInt32 index_, const std::unique_ptr & global_stat_) : ps(ps_) , index(index_) + , global_stat(global_stat_) { gen.seed(time(nullptr)); } @@ -72,7 +101,7 @@ class PSWriter : public PSRunnable bool runImpl() override; protected: - virtual DB::PageId genRandomPageId(); + virtual RandomPageId genRandomPageId(); protected: PSPtr ps; @@ -81,6 +110,7 @@ class PSWriter : public PSRunnable DB::PageId max_page_id = MAX_PAGE_ID_DEFAULT; char * memory = nullptr; DB::ReadBufferPtr buff_ptr; + const std::unique_ptr & global_stat; }; @@ -89,8 +119,8 @@ class PSWriter : public PSRunnable class PSCommonWriter : public PSWriter { public: - PSCommonWriter(const PSPtr & ps_, DB::UInt32 index_) - : PSWriter(ps_, index_) + PSCommonWriter(const PSPtr & ps_, DB::UInt32 index_, const std::unique_ptr & global_stat_) + : PSWriter(ps_, index_, global_stat_) {} void updatedRandomData() override; @@ -122,15 +152,15 @@ class PSCommonWriter : public PSWriter DB::PageFieldSizes data_sizes = {}; - DB::PageId genRandomPageId() override; + RandomPageId genRandomPageId() override; virtual size_t genBufferSize(); }; - // PSWindowsWriter can better simulate the user's workload in cooperation with PSWindowsReader // It can also be used as an independent writer to imitate user writing. // When the user is using TiFlash, The Pageid which in PageStorage should be continuously incremented. -// In the meantime, The PageId near the end may be constantly updated.So PSWindowsWriter looks like: +// In the meantime, The PageId near the end may be constantly updated. So the page ids updated by +// this class looks like: // // // | Pageid 1 Pageid 100 Pageid N | @@ -138,25 +168,23 @@ class PSCommonWriter : public PSWriter // | | // | window | // -// Every time the pageid written will be generated from the "window" range. -// And The random from the window should conform to the normal distribution. +// Every time the page_id written will be generated by a uniform dist. // When random number bigger than Pageid N, it will be Pageid N + 1, it means new page come. // When random number smaller than Pageid N, it means Page updated. +// And `genRandomPageId` will also return the page_id that is not likely to be updated later. class PSWindowWriter : public PSCommonWriter { public: - PSWindowWriter(const PSPtr & ps_, DB::UInt32 index_) - : PSCommonWriter(ps_, index_) + PSWindowWriter(const PSPtr & ps_, DB::UInt32 index_, const std::unique_ptr & global_stat_) + : PSCommonWriter(ps_, index_, global_stat_) {} String description() override { return fmt::format("(Stress Test Window Writer {})", index); } - void setWindowSize(size_t window_size); - void setNormalDistributionSigma(size_t sigma); protected: - DB::PageId genRandomPageId() override; + RandomPageId genRandomPageId() override; protected: size_t window_size = 100; @@ -166,8 +194,8 @@ class PSWindowWriter : public PSCommonWriter class PSIncreaseWriter : public PSCommonWriter { public: - PSIncreaseWriter(const PSPtr & ps_, DB::UInt32 index_) - : PSCommonWriter(ps_, index_) + PSIncreaseWriter(const PSPtr & ps_, DB::UInt32 index_, const std::unique_ptr & global_stat_) + : PSCommonWriter(ps_, index_, global_stat_) {} String description() override { return fmt::format("(Stress Test Increase Writer {})", index); } @@ -177,7 +205,7 @@ class PSIncreaseWriter : public PSCommonWriter void setPageRange(size_t page_range); protected: - DB::PageId genRandomPageId() override; + RandomPageId genRandomPageId() override; protected: size_t begin_page_id = 1; @@ -187,9 +215,10 @@ class PSIncreaseWriter : public PSCommonWriter class PSReader : public PSRunnable { public: - PSReader(const PSPtr & ps_, DB::UInt32 index_) + PSReader(const PSPtr & ps_, DB::UInt32 index_, const std::unique_ptr & global_stat_) : ps(ps_) , index(index_) + , global_stat(global_stat_) { gen.seed(time(nullptr)); } @@ -213,9 +242,10 @@ class PSReader : public PSRunnable PSPtr ps; std::mt19937 gen; size_t heavy_read_delay_ms = 0; - size_t page_read_once = 5; + size_t num_pages_read = 5; DB::UInt32 index = 0; DB::PageId max_page_id = MAX_PAGE_ID_DEFAULT; + const std::unique_ptr & global_stat; }; // PSWindowReader can better simulate the user's workload in cooperation with PSWindowsWriter @@ -233,12 +263,10 @@ class PSReader : public PSRunnable class PSWindowReader : public PSReader { public: - PSWindowReader(const PSPtr & ps_, DB::UInt32 index_) - : PSReader(ps_, index_) + PSWindowReader(const PSPtr & ps_, DB::UInt32 index_, const std::unique_ptr & global_stat_) + : PSReader(ps_, index_, global_stat_) {} - void setWindowSize(size_t window_size); - void setNormalDistributionSigma(size_t sigma); void setWriterNums(size_t writer_nums); @@ -247,7 +275,6 @@ class PSWindowReader : public PSReader DB::PageIds genRandomPageIds() override; protected: - size_t window_size = 100; size_t sigma = 11; size_t writer_nums = 0; std::mt19937 gen; @@ -259,8 +286,8 @@ class PSWindowReader : public PSReader class PSSnapshotReader : public PSReader { public: - PSSnapshotReader(const PSPtr & ps_, DB::UInt32 index_) - : PSReader(ps_, index_) + PSSnapshotReader(const PSPtr & ps_, DB::UInt32 index_, const std::unique_ptr & global_stat_) + : PSReader(ps_, index_, global_stat_) {} bool runImpl() override; diff --git a/dbms/src/Storages/Page/workload/PSStressEnv.cpp b/dbms/src/Storages/Page/workload/PSStressEnv.cpp index 82580954802..6a24b06759a 100644 --- a/dbms/src/Storages/Page/workload/PSStressEnv.cpp +++ b/dbms/src/Storages/Page/workload/PSStressEnv.cpp @@ -13,6 +13,7 @@ // limitations under the License. +#include #include #include #include @@ -27,7 +28,6 @@ #include #include -#include "Common/Exception.h" namespace DB::PS::tests { @@ -72,7 +72,7 @@ StressEnv StressEnv::parse(int argc, char ** argv) if (options.count("help") > 0) { std::cerr << desc << std::endl; - std::cerr << StressWorkloadManger::getInstance().toDebugStirng() << std::endl; + std::cerr << PageWorkloadFactory::getInstance().toDebugStirng() << std::endl; exit(0); } @@ -114,8 +114,9 @@ StressEnv StressEnv::parse(int argc, char ** argv) void setupSignal() { signal(SIGINT, [](int /*signal*/) { - LOG_ERROR(StressEnv::logger, "Receive finish signal. Wait for the GC threads to end."); + LOG_INFO(StressEnv::logger, "Receive finish signal. Wait for the threads finish"); StressEnvStatus::getInstance().setStat(STATUS_INTERRUPT); + PageWorkloadFactory::getInstance().stopWorkload(); }); } diff --git a/dbms/src/Storages/Page/workload/PSWorkload.cpp b/dbms/src/Storages/Page/workload/PSWorkload.cpp index 3d0bfe0310f..28ebac753f2 100644 --- a/dbms/src/Storages/Page/workload/PSWorkload.cpp +++ b/dbms/src/Storages/Page/workload/PSWorkload.cpp @@ -20,6 +20,8 @@ #include #include +#include + namespace DB::PS::tests { void StressWorkload::onDumpResult() @@ -107,6 +109,8 @@ void StressWorkload::initPageStorage(DB::PageStorageConfig & config, String path }); LOG_INFO(StressEnv::logger, "Recover {} pages.", num_of_pages); } + + runtime_stat = std::make_unique(); } void StressWorkload::startBackgroundTimer() @@ -139,19 +143,19 @@ void StressWorkload::startBackgroundTimer() } } -void StressWorkloadManger::runWorkload() +void PageWorkloadFactory::runWorkload() { if (options.just_init_pages || options.situation_mask == NORMAL_WORKLOAD) { String name; WorkloadCreator func; std::tie(name, func) = get(NORMAL_WORKLOAD); - auto workload = std::shared_ptr(func(options)); - LOG_INFO(StressEnv::logger, "Start Running {} , {}", name, workload->desc()); - workload->run(); + running_workload = std::shared_ptr(func(options)); + LOG_INFO(StressEnv::logger, "Start Running {}, {}", name, running_workload->desc()); + running_workload->run(); if (!options.just_init_pages) { - workload->onDumpResult(); + running_workload->onDumpResult(); } return; } @@ -166,18 +170,19 @@ void StressWorkloadManger::runWorkload() { auto & name = it.second.first; auto & creator = it.second.second; - auto workload = creator(options); - LOG_INFO(StressEnv::logger, "Start Running {} , {}", name, workload->desc()); - workload->run(); - if (options.verify && !workload->verify()) + running_workload = creator(options); + SCOPE_EXIT({ running_workload.reset(); }); + LOG_INFO(StressEnv::logger, "Start Running {}, {}", name, running_workload->desc()); + running_workload->run(); + if (options.verify && !running_workload->verify()) { - LOG_WARNING(StressEnv::logger, "work load : {} failed.", name); - workload->onFailed(); + LOG_WARNING(StressEnv::logger, "work load: {} failed.", name); + running_workload->onFailed(); break; } else { - workload->onDumpResult(); + running_workload->onDumpResult(); } } } diff --git a/dbms/src/Storages/Page/workload/PSWorkload.h b/dbms/src/Storages/Page/workload/PSWorkload.h index 8c2bc85ef75..5824884e20c 100644 --- a/dbms/src/Storages/Page/workload/PSWorkload.h +++ b/dbms/src/Storages/Page/workload/PSWorkload.h @@ -24,6 +24,8 @@ #include #include +#include + #define NORMAL_WORKLOAD 0 namespace DB::PS::tests { @@ -64,6 +66,18 @@ class StressWorkload virtual void onFailed() {} virtual void onDumpResult(); + void stop() + { + if (stress_time) + stress_time->stop(); + if (scanner) + scanner->stop(); + if (gc) + gc->stop(); + if (metrics_dumper) + metrics_dumper->stop(); + } + protected: void initPageStorage(DB::PageStorageConfig & config, String path_prefix = ""); @@ -75,7 +89,7 @@ class StressWorkload writers.clear(); for (size_t i = 0; i < nums_writers; ++i) { - auto writer = std::make_shared(ps, i); + auto writer = std::make_shared(ps, i, runtime_stat); if (writer_configure) { writer_configure(writer); @@ -91,7 +105,7 @@ class StressWorkload readers.clear(); for (size_t i = 0; i < nums_readers; ++i) { - auto reader = std::make_shared(ps, i); + auto reader = std::make_shared(ps, i, runtime_stat); if (reader_configure) { reader_configure(reader); @@ -108,6 +122,8 @@ class StressWorkload PSPtr ps; DB::PSDiskDelegatorPtr delegator; + std::unique_ptr runtime_stat; + std::list> writers; std::list> readers; @@ -120,7 +136,7 @@ class StressWorkload }; -class StressWorkloadManger +class PageWorkloadFactory { private: using WorkloadCreator = std::function(const StressEnv &)>; @@ -128,14 +144,14 @@ class StressWorkloadManger std::map> funcs; UInt64 registed_masks = 0; - StressWorkloadManger() = default; + PageWorkloadFactory() = default; public: - DISALLOW_COPY_AND_MOVE(StressWorkloadManger); + DISALLOW_COPY_AND_MOVE(PageWorkloadFactory); - static StressWorkloadManger & getInstance() + static PageWorkloadFactory & getInstance() { - static StressWorkloadManger instance; + static PageWorkloadFactory instance; return instance; } @@ -189,14 +205,21 @@ class StressWorkloadManger void runWorkload(); + void stopWorkload() + { + if (running_workload) + running_workload->stop(); + } + private: StressEnv options; + std::shared_ptr running_workload; }; template void work_load_register() { - StressWorkloadManger::getInstance().reg( + PageWorkloadFactory::getInstance().reg( Workload::nameFunc(), Workload::maskFunc(), [](const StressEnv & opts) -> std::shared_ptr { diff --git a/dbms/src/Storages/Page/workload/ThousandsOfOffset.h b/dbms/src/Storages/Page/workload/ThousandsOfOffset.h index 4bb7a3042c8..13ca78a56b7 100644 --- a/dbms/src/Storages/Page/workload/ThousandsOfOffset.h +++ b/dbms/src/Storages/Page/workload/ThousandsOfOffset.h @@ -89,7 +89,6 @@ class ThousandsOfOffset : public StressWorkload writer->setFieldSize(field_size); writer->setBatchBufferNums(1); writer->setBatchBufferSize(buffer_size); - writer->setWindowSize(500); writer->setNormalDistributionSigma(13); }); @@ -111,7 +110,6 @@ class ThousandsOfOffset : public StressWorkload writer->setFieldSize(field_size); writer->setBatchBufferNums(20); writer->setBatchBufferSize(buffer_size); - writer->setWindowSize(500); writer->setNormalDistributionSigma(13); }); @@ -133,7 +131,6 @@ class ThousandsOfOffset : public StressWorkload writer->setFieldSize(field_size); writer->setBatchBufferNums(1); writer->setBatchBufferSize(buffer_size); - writer->setWindowSize(500); writer->setNormalDistributionSigma(13); }); @@ -155,7 +152,6 @@ class ThousandsOfOffset : public StressWorkload writer->setFieldSize(field_size); writer->setBatchBufferNums(20); writer->setBatchBufferSize(buffer_size); - writer->setWindowSize(500); writer->setNormalDistributionSigma(13); }); From b6a7355a879f096699256e9075a6cd3f8e49c31e Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Tue, 25 Oct 2022 01:47:03 +0800 Subject: [PATCH 03/12] Keep track of PageIds in memory --- dbms/src/Storages/Page/V3/PageDirectory.cpp | 7 +-- .../Page/workload/HeavySkewWriteRead.h | 9 +++- .../src/Storages/Page/workload/PSRunnable.cpp | 53 +++++++++++-------- dbms/src/Storages/Page/workload/PSRunnable.h | 9 ++-- 4 files changed, 49 insertions(+), 29 deletions(-) diff --git a/dbms/src/Storages/Page/V3/PageDirectory.cpp b/dbms/src/Storages/Page/V3/PageDirectory.cpp index ea21000443b..b11228fb687 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectory.cpp @@ -1124,12 +1124,13 @@ PageId PageDirectory::getMaxId() const std::set PageDirectory::getAllPageIds() { std::set page_ids; - std::shared_lock read_lock(table_rw_mutex); + std::shared_lock read_lock(table_rw_mutex); + const auto seq = sequence.load(); for (auto & [page_id, versioned] : mvcc_table_directory) { - (void)versioned; - page_ids.insert(page_id); + if (versioned->isVisible(seq)) + page_ids.insert(page_id); } return page_ids; } diff --git a/dbms/src/Storages/Page/workload/HeavySkewWriteRead.h b/dbms/src/Storages/Page/workload/HeavySkewWriteRead.h index fbefd7b02f6..9129196bf9d 100644 --- a/dbms/src/Storages/Page/workload/HeavySkewWriteRead.h +++ b/dbms/src/Storages/Page/workload/HeavySkewWriteRead.h @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include namespace DB::PS::tests @@ -38,8 +39,8 @@ class HeavySkewWriteRead : public StressWorkload String desc() override { return fmt::format("Some of options will be ignored" - "`paths` will only used first one. which is {}. Data will store in {} ." - "Please cleanup folder after this test." + "`paths` will only used first one. which is {}. Data will store in {}. " + "Please cleanup folder after this test. " "The current workload will elapse near 60 seconds", options.paths[0], options.paths[0] + "/" + name()); @@ -72,6 +73,10 @@ class HeavySkewWriteRead : public StressWorkload pool.joinAll(); stop_watch.stop(); } + + // ps->traverse([](const DB::Page & page) { + // LOG_INFO(StressEnv::logger, "page_id={}", page.page_id); + // }); } bool verify() override diff --git a/dbms/src/Storages/Page/workload/PSRunnable.cpp b/dbms/src/Storages/Page/workload/PSRunnable.cpp index b2c934e2844..f1f1a32e3b4 100644 --- a/dbms/src/Storages/Page/workload/PSRunnable.cpp +++ b/dbms/src/Storages/Page/workload/PSRunnable.cpp @@ -26,10 +26,23 @@ #include #include +#include #include namespace DB::PS::tests { + +void GlobalStat::commit(const RandomPageId & c) +{ + std::lock_guard lock(mtx_page_id); + commit_ids.insert(c.page_id); + for (const auto & id : c.page_id_to_remove) + { + commit_ids.erase(id); + pending_remove_ids.erase(id); + } +} + void PSRunnable::run() try { @@ -142,6 +155,8 @@ bool PSWriter::runImpl() // verbose logging for debug // LOG_TRACE(StressEnv::logger, "write done, page_id={}, remove={}", r.page_id, r.page_id_to_remove); + + global_stat->commit(r); return true; } @@ -200,7 +215,8 @@ bool PSCommonWriter::runImpl() ps->write(std::move(wb)); // verbose logging for debug - // LOG_DEBUG(StressEnv::logger, "write done, page_id={}, remove={}", r.page_id, r.page_id_to_remove); + // LOG_TRACE(StressEnv::logger, "write done, page_id={}, remove={}", r.page_id, r.page_id_to_remove); + global_stat->commit(r); return (batch_buffer_limit == 0 || bytes_used < batch_buffer_limit); } @@ -340,17 +356,19 @@ RandomPageId PSWindowWriter::genRandomPageId() global_stat->left_id_boundary = left_boundary; // Remove the page id that is not likely update/read any more - const auto old_removed_id = global_stat->last_removed_page_id; - for (size_t i = old_removed_id; i < left_boundary; ++i) - ids_to_del.insert(i); - global_stat->last_removed_page_id = left_boundary; + for (const auto & id : global_stat->commit_ids) + { + if (id >= left_boundary) + break; + ids_to_del.insert(id); + global_stat->pending_remove_ids.insert(id); + } auto page_id = global_stat->right_id_boundary++; if (page_id % 200 == 0) - LOG_INFO(StressEnv::logger, "Remove old id range [{}, {}), update boundary to [{}, {})", old_removed_id, left_boundary, left_boundary, global_stat->right_id_boundary); + LOG_INFO(StressEnv::logger, "Update boundary to [{}, {})", left_boundary, global_stat->right_id_boundary); return page_id; }(); - global_stat->writing_page[index] = page_id; return RandomPageId(page_id, ids_to_del); } @@ -370,8 +388,6 @@ DB::PageIds PSWindowReader::genRandomPageIds() // Nothing to read if (page_id_boundary_copy < (writer_nums + num_pages_read)) return {}; - if (global_stat->left_id_boundary.load() == 0) - return {}; const size_t read_right_boundary = page_id_boundary_copy - writer_nums - num_pages_read; @@ -388,21 +404,16 @@ DB::PageIds PSWindowReader::genRandomPageIds() rand_id = std::min(rand_id, read_right_boundary); DB::PageIds page_ids; - for (size_t id = rand_id; id < num_pages_read + rand_id; ++id) + std::lock_guard lock(global_stat->mtx_page_id); { - bool is_being_write = [this](DB::PageId page_id) { - std::lock_guard page_id_lock(global_stat->mtx_page_id); - for (size_t j = 0; j < writer_nums; j++) + for (size_t id = rand_id; id < num_pages_read + rand_id; ++id) + { + if (global_stat->commit_ids.find(id) != global_stat->commit_ids.end() + && global_stat->pending_remove_ids.find(id) == global_stat->pending_remove_ids.end()) { - if (page_id == global_stat->writing_page[j]) - { - return true; - } + page_ids.emplace_back(id); } - return false; - }(id); - if (!is_being_write) - page_ids.emplace_back(id); + } } return page_ids; diff --git a/dbms/src/Storages/Page/workload/PSRunnable.h b/dbms/src/Storages/Page/workload/PSRunnable.h index 79d3f91af73..42890375bce 100644 --- a/dbms/src/Storages/Page/workload/PSRunnable.h +++ b/dbms/src/Storages/Page/workload/PSRunnable.h @@ -57,11 +57,14 @@ struct GlobalStat // shared status between PSWindowWriter and PSWindowReader std::mutex mtx_page_id; // The page ids between [left_id_boundary, right_id_boundary) - // and not exists in `writing_page` is readable + // and exists in `commit_ids` && not exists in `pending_remove_ids` + // is readable std::atomic right_id_boundary = 0; std::atomic left_id_boundary = 0; - DB::PageId last_removed_page_id = 0; - DB::PageId writing_page[1000]; + std::set commit_ids; + std::set pending_remove_ids; + + void commit(const RandomPageId & c); }; class PSWriter : public PSRunnable From cce626650c3925de9c5b0f3568ee3d40a5a6294d Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Tue, 25 Oct 2022 01:54:19 +0800 Subject: [PATCH 04/12] cleanup useless code --- .../Storages/Page/workload/HeavySkewWriteRead.h | 7 +++---- dbms/src/Storages/Page/workload/PSRunnable.cpp | 14 ++------------ dbms/src/Storages/Page/workload/PSRunnable.h | 5 ----- 3 files changed, 5 insertions(+), 21 deletions(-) diff --git a/dbms/src/Storages/Page/workload/HeavySkewWriteRead.h b/dbms/src/Storages/Page/workload/HeavySkewWriteRead.h index 9129196bf9d..dcddb742cad 100644 --- a/dbms/src/Storages/Page/workload/HeavySkewWriteRead.h +++ b/dbms/src/Storages/Page/workload/HeavySkewWriteRead.h @@ -57,16 +57,15 @@ class HeavySkewWriteRead : public StressWorkload { stop_watch.start(); const auto num_writers = options.num_writers; - startWriter(num_writers, [&](std::shared_ptr writer) -> void { + startWriter(num_writers, [&](std::shared_ptr writer) { writer->setBatchBufferNums(1); writer->setBatchBufferRange(0, options.avg_page_size * 2); writer->setNormalDistributionSigma(250); }); - startReader(options.num_readers, [num_writers](std::shared_ptr reader) -> void { - reader->setPageReadOnce(5); + startReader(options.num_readers, [](std::shared_ptr reader) { + reader->setReadPageNums(5); reader->setReadDelay(0); - reader->setWriterNums(num_writers); reader->setNormalDistributionSigma(250); }); diff --git a/dbms/src/Storages/Page/workload/PSRunnable.cpp b/dbms/src/Storages/Page/workload/PSRunnable.cpp index f1f1a32e3b4..971dd1c1abc 100644 --- a/dbms/src/Storages/Page/workload/PSRunnable.cpp +++ b/dbms/src/Storages/Page/workload/PSRunnable.cpp @@ -300,11 +300,6 @@ bool PSReader::runImpl() return true; } -void PSReader::setPageReadOnce(size_t page_read_once_) -{ - num_pages_read = page_read_once_; -} - void PSReader::setReadDelay(size_t delay_ms) { heavy_read_delay_ms = delay_ms; @@ -377,19 +372,14 @@ void PSWindowReader::setNormalDistributionSigma(size_t sigma_) sigma = sigma_; } -void PSWindowReader::setWriterNums(size_t writer_nums_) -{ - writer_nums = writer_nums_; -} - DB::PageIds PSWindowReader::genRandomPageIds() { const auto page_id_boundary_copy = global_stat->right_id_boundary.load(); // Nothing to read - if (page_id_boundary_copy < (writer_nums + num_pages_read)) + if (page_id_boundary_copy < num_pages_read) return {}; - const size_t read_right_boundary = page_id_boundary_copy - writer_nums - num_pages_read; + const size_t read_right_boundary = page_id_boundary_copy - num_pages_read; // Generate a random number in the window, normal dist by μ=0 and σ=sigma std::normal_distribution<> distribution{0.0, static_cast(sigma)}; diff --git a/dbms/src/Storages/Page/workload/PSRunnable.h b/dbms/src/Storages/Page/workload/PSRunnable.h index 42890375bce..2fa038b1abe 100644 --- a/dbms/src/Storages/Page/workload/PSRunnable.h +++ b/dbms/src/Storages/Page/workload/PSRunnable.h @@ -230,8 +230,6 @@ class PSReader : public PSRunnable bool runImpl() override; - void setPageReadOnce(size_t page_read_once); - void setReadDelay(size_t delay_ms); void setReadPageRange(size_t max_page_id); @@ -272,14 +270,11 @@ class PSWindowReader : public PSReader void setNormalDistributionSigma(size_t sigma); - void setWriterNums(size_t writer_nums); - protected: DB::PageIds genRandomPageIds() override; protected: size_t sigma = 11; - size_t writer_nums = 0; std::mt19937 gen; }; From 0f175f30717d220cc3a150325ca6976717926d05 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Fri, 28 Oct 2022 16:06:14 +0800 Subject: [PATCH 05/12] Make perf test more stable --- dbms/src/Storages/Page/workload/HeavyRead.h | 2 +- dbms/src/Storages/Page/workload/Normal.h | 3 +- .../src/Storages/Page/workload/PSRunnable.cpp | 80 ++++++++----------- dbms/src/Storages/Page/workload/PSRunnable.h | 18 ++--- 4 files changed, 45 insertions(+), 58 deletions(-) diff --git a/dbms/src/Storages/Page/workload/HeavyRead.h b/dbms/src/Storages/Page/workload/HeavyRead.h index 79101605ce1..3614a87d3e8 100644 --- a/dbms/src/Storages/Page/workload/HeavyRead.h +++ b/dbms/src/Storages/Page/workload/HeavyRead.h @@ -50,7 +50,7 @@ class HeavyRead : public StressWorkload { DB::PageStorageConfig config; initPageStorage(config, name()); - PSWriter::fillAllPages(ps); + PSWriter::fillAllPages(ps, MAX_PAGE_ID_DEFAULT); metrics_dumper = std::make_shared(1); metrics_dumper->start(); diff --git a/dbms/src/Storages/Page/workload/Normal.h b/dbms/src/Storages/Page/workload/Normal.h index a49c3f27c95..650b9550312 100644 --- a/dbms/src/Storages/Page/workload/Normal.h +++ b/dbms/src/Storages/Page/workload/Normal.h @@ -56,7 +56,8 @@ class NormalWorkload // init all pages in PageStorage if (options.init_pages || options.just_init_pages) { - PSWriter::fillAllPages(ps); + static constexpr PageId MAX_PAGE_ID_DEFAULT = 1000; + PSWriter::fillAllPages(ps, MAX_PAGE_ID_DEFAULT); LOG_INFO(StressEnv::logger, "All pages have been init."); if (options.just_init_pages) { diff --git a/dbms/src/Storages/Page/workload/PSRunnable.cpp b/dbms/src/Storages/Page/workload/PSRunnable.cpp index 971dd1c1abc..f47ba2ee267 100644 --- a/dbms/src/Storages/Page/workload/PSRunnable.cpp +++ b/dbms/src/Storages/Page/workload/PSRunnable.cpp @@ -75,61 +75,53 @@ size_t PSRunnable::getPagesUsed() const return pages_used; } -size_t PSWriter::approx_page_mb = 2; -void PSWriter::setApproxPageSize(size_t size) +/// +/// Writer +/// + +size_t PSWriter::approx_page_bytes = 2; +void PSWriter::setApproxPageSize(size_t bytes) { - LOG_INFO(StressEnv::logger, "Page approx size is set to {} MB", formatReadableSizeWithBinarySuffix(size)); - approx_page_mb = size * 1024 * 1024; + LOG_INFO(StressEnv::logger, "Page approx size is set to {}", formatReadableSizeWithBinarySuffix(bytes)); + approx_page_bytes = bytes; } -DB::ReadBufferPtr PSWriter::genRandomData(const DB::PageId pageId, DB::MemHolder & holder) +DB::ReadBufferPtr PSWriter::genRandomData(DB::PageId page_id, std::unique_ptr & p) { // fill page with random bytes std::mt19937 size_gen; size_gen.seed(time(nullptr)); std::uniform_int_distribution<> dist(0, 3000); - const size_t buff_sz = approx_page_mb * DB::MB + dist(size_gen); - char * buff = static_cast(malloc(buff_sz)); // NOLINT - if (buff == nullptr) - { - throw DB::Exception("Alloc fix memory failed.", DB::ErrorCodes::LOGICAL_ERROR); - } - - const char buff_ch = pageId % 0xFF; - memset(buff, buff_ch, buff_sz); + const size_t buff_sz = approx_page_bytes + dist(size_gen); + p.reset(new char[buff_sz]); - holder = DB::createMemHolder(buff, [&](char * p) { free(p); }); // NOLINT + const char buff_ch = page_id % 0xFF; + memset(p.get(), buff_ch, buff_sz); - return std::make_shared(const_cast(buff), buff_sz); + return std::make_shared(p.get(), buff_sz); } void PSWriter::updatedRandomData() { - size_t memory_size = approx_page_mb * DB::MB * 2; + size_t memory_size = approx_page_bytes * 2; if (memory == nullptr) { - memory = static_cast(malloc(memory_size)); // NOLINT - if (memory == nullptr) - { - throw DB::Exception("Alloc fix memory failed.", DB::ErrorCodes::LOGICAL_ERROR); - } + memory = std::unique_ptr(new char[memory_size]); for (size_t i = 0; i < memory_size; i++) - { - memset(memory + i, i % 0xFF, sizeof(char)); - } + memory[i] = i % 0xFF; } - std::uniform_int_distribution<> dist(0, memory_size / 2 - 1); - size_t gen_size = dist(gen); - buff_ptr = std::make_shared(memory + gen_size, memory_size - gen_size); + // std::uniform_int_distribution<> dist(0, memory_size / 2 - 1); + // size_t gen_size = dist(gen); + buff_ptr = std::make_shared(memory.get(), approx_page_bytes); } -void PSWriter::fillAllPages(const PSPtr & ps) +void PSWriter::fillAllPages(const PSPtr & ps, size_t max_page_id) { - for (DB::PageId page_id = 0; page_id <= MAX_PAGE_ID_DEFAULT; ++page_id) + for (DB::PageId page_id = 0; page_id <= max_page_id; ++page_id) { - DB::MemHolder holder; + std::unique_ptr holder; DB::ReadBufferPtr buff = genRandomData(page_id, holder); DB::WriteBatch wb{DB::TEST_NAMESPACE_ID}; @@ -143,14 +135,15 @@ void PSWriter::fillAllPages(const PSPtr & ps) bool PSWriter::runImpl() { const auto r = genRandomPageId(); - updatedRandomData(); + updatedRandomData(); // update buff_ptr DB::WriteBatch wb{DB::TEST_NAMESPACE_ID}; wb.putPage(r.page_id, 0, buff_ptr, buff_ptr->buffer().size()); for (const auto id : r.page_id_to_remove) wb.delPage(id); ps->write(std::move(wb)); - ++pages_used; + + pages_used += 1; bytes_used += buff_ptr->buffer().size(); // verbose logging for debug @@ -162,8 +155,9 @@ bool PSWriter::runImpl() RandomPageId PSWriter::genRandomPageId() { - std::normal_distribution<> distribution{static_cast(max_page_id) / 2, 150}; - return RandomPageId(static_cast(std::round(distribution(gen))) % max_page_id); + // std::normal_distribution<> distribution{static_cast(max_page_id) / 2, 150}; + std::uniform_int_distribution dist(0UL, max_page_id - 1); + return RandomPageId(static_cast(std::round(dist(gen)))); } void PSCommonWriter::updatedRandomData() @@ -175,16 +169,9 @@ void PSCommonWriter::updatedRandomData() if (memory == nullptr) { - memory = static_cast(malloc(memory_size)); // NOLINT - if (memory == nullptr) - { - throw DB::Exception("Alloc fix memory failed.", DB::ErrorCodes::LOGICAL_ERROR); - } - + memory = std::unique_ptr(new char[memory_size]); for (size_t i = 0; i < memory_size; i++) - { - memset(memory + i, i % 0xFF, sizeof(char)); - } + memory[i] = i % 0xFF; } buff_ptrs.clear(); @@ -192,7 +179,7 @@ void PSCommonWriter::updatedRandomData() size_t gen_size = genBufferSize(); for (size_t i = 0; i < batch_buffer_nums; ++i) { - buff_ptrs.emplace_back(std::make_shared(memory + i * single_buff_size, gen_size)); + buff_ptrs.emplace_back(std::make_shared(memory.get() + i * single_buff_size, gen_size)); } } @@ -269,6 +256,9 @@ size_t PSCommonWriter::genBufferSize() return batch_buffer_size; } +/// +/// Reader +/// DB::PageIds PSReader::genRandomPageIds() { diff --git a/dbms/src/Storages/Page/workload/PSRunnable.h b/dbms/src/Storages/Page/workload/PSRunnable.h index 2fa038b1abe..93b858e7070 100644 --- a/dbms/src/Storages/Page/workload/PSRunnable.h +++ b/dbms/src/Storages/Page/workload/PSRunnable.h @@ -17,10 +17,9 @@ #include #include -const DB::PageId MAX_PAGE_ID_DEFAULT = 1000; - namespace DB::PS::tests { +static constexpr PageId MAX_PAGE_ID_DEFAULT = 1000; class PSRunnable : public Poco::Runnable { public: @@ -69,7 +68,7 @@ struct GlobalStat class PSWriter : public PSRunnable { - static size_t approx_page_mb; + static size_t approx_page_bytes; public: PSWriter(const PSPtr & ps_, DB::UInt32 index_, const std::unique_ptr & global_stat_) @@ -82,10 +81,7 @@ class PSWriter : public PSRunnable ~PSWriter() override { - if (memory != nullptr) - { - free(memory); - } + memory.reset(); } String description() override @@ -93,13 +89,13 @@ class PSWriter : public PSRunnable return fmt::format("(Stress Test Writer {})", index); } - static void setApproxPageSize(size_t size_mb); + static void setApproxPageSize(size_t bytes); - static DB::ReadBufferPtr genRandomData(DB::PageId pageId, DB::MemHolder & holder); + static DB::ReadBufferPtr genRandomData(DB::PageId page_id, std::unique_ptr & p); virtual void updatedRandomData(); - static void fillAllPages(const PSPtr & ps); + static void fillAllPages(const PSPtr & ps, size_t max_page_id); bool runImpl() override; @@ -111,7 +107,7 @@ class PSWriter : public PSRunnable DB::UInt32 index = 0; std::mt19937 gen; DB::PageId max_page_id = MAX_PAGE_ID_DEFAULT; - char * memory = nullptr; + std::unique_ptr memory; DB::ReadBufferPtr buff_ptr; const std::unique_ptr & global_stat; }; From e2a99739d486a018955fb05bef4257c0bdf7a32c Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Fri, 28 Oct 2022 22:43:32 +0800 Subject: [PATCH 06/12] Remove useless code --- .../Page/workload/HeavyMemoryCostInGC.h | 2 +- dbms/src/Storages/Page/workload/HeavyRead.h | 4 +- .../Page/workload/HeavySkewWriteRead.h | 2 +- dbms/src/Storages/Page/workload/HeavyWrite.h | 2 +- .../Page/workload/HighValidBigFileGC.h | 6 +- .../Page/workload/HoldSnapshotsLongTime.h | 2 +- dbms/src/Storages/Page/workload/Normal.h | 13 +- .../src/Storages/Page/workload/PSRunnable.cpp | 145 +++++++----------- dbms/src/Storages/Page/workload/PSRunnable.h | 31 ++-- .../src/Storages/Page/workload/PSWorkload.cpp | 42 +++++ dbms/src/Storages/Page/workload/PSWorkload.h | 2 + .../workload/PageStorageInMemoryCapacity.h | 2 +- .../Page/workload/ThousandsOfOffset.h | 8 +- 13 files changed, 129 insertions(+), 132 deletions(-) diff --git a/dbms/src/Storages/Page/workload/HeavyMemoryCostInGC.h b/dbms/src/Storages/Page/workload/HeavyMemoryCostInGC.h index e8d66cbad88..8dd68dfed6b 100644 --- a/dbms/src/Storages/Page/workload/HeavyMemoryCostInGC.h +++ b/dbms/src/Storages/Page/workload/HeavyMemoryCostInGC.h @@ -60,7 +60,7 @@ class HeavyMemoryCostInGC startWriter(options.num_writers, [](std::shared_ptr writer) -> void { writer->setBatchBufferNums(100); - writer->setBatchBufferSize(1); + writer->setBufferSizeRange(1, 1); }); pool.joinAll(); diff --git a/dbms/src/Storages/Page/workload/HeavyRead.h b/dbms/src/Storages/Page/workload/HeavyRead.h index 3614a87d3e8..76893f5cf94 100644 --- a/dbms/src/Storages/Page/workload/HeavyRead.h +++ b/dbms/src/Storages/Page/workload/HeavyRead.h @@ -14,6 +14,8 @@ #include +#include "Storages/Page/workload/PSRunnable.h" + namespace DB::PS::tests { class HeavyRead : public StressWorkload @@ -50,7 +52,7 @@ class HeavyRead : public StressWorkload { DB::PageStorageConfig config; initPageStorage(config, name()); - PSWriter::fillAllPages(ps, MAX_PAGE_ID_DEFAULT); + initPages(MAX_PAGE_ID_DEFAULT); metrics_dumper = std::make_shared(1); metrics_dumper->start(); diff --git a/dbms/src/Storages/Page/workload/HeavySkewWriteRead.h b/dbms/src/Storages/Page/workload/HeavySkewWriteRead.h index dcddb742cad..8fe8537ec4f 100644 --- a/dbms/src/Storages/Page/workload/HeavySkewWriteRead.h +++ b/dbms/src/Storages/Page/workload/HeavySkewWriteRead.h @@ -59,7 +59,7 @@ class HeavySkewWriteRead : public StressWorkload const auto num_writers = options.num_writers; startWriter(num_writers, [&](std::shared_ptr writer) { writer->setBatchBufferNums(1); - writer->setBatchBufferRange(0, options.avg_page_size * 2); + writer->setBufferSizeRange(0, options.avg_page_size * 2); writer->setNormalDistributionSigma(250); }); diff --git a/dbms/src/Storages/Page/workload/HeavyWrite.h b/dbms/src/Storages/Page/workload/HeavyWrite.h index 4e88e0f34f3..41aba5486f0 100644 --- a/dbms/src/Storages/Page/workload/HeavyWrite.h +++ b/dbms/src/Storages/Page/workload/HeavyWrite.h @@ -59,7 +59,7 @@ class HeavyWrite : public StressWorkload stop_watch.start(); startWriter(options.num_writers, [](std::shared_ptr writer) -> void { writer->setBatchBufferNums(4); - writer->setBatchBufferRange(1, 2 * DB::MB); + writer->setBufferSizeRange(1, 2 * DB::MB); }); pool.joinAll(); diff --git a/dbms/src/Storages/Page/workload/HighValidBigFileGC.h b/dbms/src/Storages/Page/workload/HighValidBigFileGC.h index cbccd0aa58a..3bc233fcf9f 100644 --- a/dbms/src/Storages/Page/workload/HighValidBigFileGC.h +++ b/dbms/src/Storages/Page/workload/HighValidBigFileGC.h @@ -65,7 +65,7 @@ class HighValidBigFileGCWorkload startWriter(1, [](std::shared_ptr writer) -> void { writer->setBatchBufferNums(1); - writer->setBatchBufferSize(100ULL * DB::MB); + writer->setBufferSizeRange(100ULL * DB::MB, 100ULL * DB::MB); writer->setBatchBufferLimit(8ULL * DB::GB); writer->setBatchBufferPageRange(1000000); }); @@ -86,7 +86,7 @@ class HighValidBigFileGCWorkload stop_watch.start(); startWriter(1, [](std::shared_ptr writer) -> void { writer->setBatchBufferNums(4); - writer->setBatchBufferSize(2ULL * DB::MB); + writer->setBufferSizeRange(2ULL * DB::MB, 2ULL * DB::MB); writer->setBatchBufferLimit(1ULL * DB::GB); writer->setBatchBufferPageRange(1000000); }); @@ -103,7 +103,7 @@ class HighValidBigFileGCWorkload stop_watch.start(); startWriter(1, [](std::shared_ptr writer) -> void { writer->setBatchBufferNums(4); - writer->setBatchBufferSize(2ULL * DB::MB); + writer->setBufferSizeRange(2ULL * DB::MB, 2ULL * DB::MB); writer->setBatchBufferLimit(1ULL * DB::GB); writer->setBatchBufferPageRange(1000000); }); diff --git a/dbms/src/Storages/Page/workload/HoldSnapshotsLongTime.h b/dbms/src/Storages/Page/workload/HoldSnapshotsLongTime.h index e4789271d75..07273d9d64e 100644 --- a/dbms/src/Storages/Page/workload/HoldSnapshotsLongTime.h +++ b/dbms/src/Storages/Page/workload/HoldSnapshotsLongTime.h @@ -59,7 +59,7 @@ class HoldSnapshotsLongTime : public StressWorkload stop_watch.start(); startWriter(options.num_writers, [](std::shared_ptr writer) -> void { writer->setBatchBufferNums(1); - writer->setBatchBufferRange(10 * 1024, 1 * DB::MB); + writer->setBufferSizeRange(10 * 1024, 1 * DB::MB); writer->setNormalDistributionSigma(13); }); diff --git a/dbms/src/Storages/Page/workload/Normal.h b/dbms/src/Storages/Page/workload/Normal.h index 650b9550312..05112a934d1 100644 --- a/dbms/src/Storages/Page/workload/Normal.h +++ b/dbms/src/Storages/Page/workload/Normal.h @@ -14,6 +14,8 @@ #include +#include "Storages/Page/workload/PSRunnable.h" + namespace DB::PS::tests { class NormalWorkload @@ -48,16 +50,11 @@ class NormalWorkload config.num_write_slots = options.num_writer_slots; initPageStorage(config); - if (options.avg_page_size != 0) - { - PSWriter::setApproxPageSize(options.avg_page_size); - } - // init all pages in PageStorage if (options.init_pages || options.just_init_pages) { static constexpr PageId MAX_PAGE_ID_DEFAULT = 1000; - PSWriter::fillAllPages(ps, MAX_PAGE_ID_DEFAULT); + initPages(MAX_PAGE_ID_DEFAULT); LOG_INFO(StressEnv::logger, "All pages have been init."); if (options.just_init_pages) { @@ -67,7 +64,9 @@ class NormalWorkload stop_watch.start(); - startWriter(options.num_writers); + startWriter(options.num_writers, [this](std::shared_ptr w) { + w->setBufferSizeRange(options.avg_page_size, options.avg_page_size); + }); const size_t read_delay_ms = options.read_delay_ms; startReader(options.num_readers, [read_delay_ms](std::shared_ptr reader) -> void { reader->setReadDelay(read_delay_ms); diff --git a/dbms/src/Storages/Page/workload/PSRunnable.cpp b/dbms/src/Storages/Page/workload/PSRunnable.cpp index f47ba2ee267..de0c3b5079c 100644 --- a/dbms/src/Storages/Page/workload/PSRunnable.cpp +++ b/dbms/src/Storages/Page/workload/PSRunnable.cpp @@ -79,32 +79,9 @@ size_t PSRunnable::getPagesUsed() const /// Writer /// -size_t PSWriter::approx_page_bytes = 2; -void PSWriter::setApproxPageSize(size_t bytes) +DB::ReadBufferPtr PSWriter::updatedRandomData() { - LOG_INFO(StressEnv::logger, "Page approx size is set to {}", formatReadableSizeWithBinarySuffix(bytes)); - approx_page_bytes = bytes; -} - -DB::ReadBufferPtr PSWriter::genRandomData(DB::PageId page_id, std::unique_ptr & p) -{ - // fill page with random bytes - std::mt19937 size_gen; - size_gen.seed(time(nullptr)); - std::uniform_int_distribution<> dist(0, 3000); - - const size_t buff_sz = approx_page_bytes + dist(size_gen); - p.reset(new char[buff_sz]); - - const char buff_ch = page_id % 0xFF; - memset(p.get(), buff_ch, buff_sz); - - return std::make_shared(p.get(), buff_sz); -} - -void PSWriter::updatedRandomData() -{ - size_t memory_size = approx_page_bytes * 2; + size_t memory_size = buffer_size_max; if (memory == nullptr) { memory = std::unique_ptr(new char[memory_size]); @@ -112,30 +89,25 @@ void PSWriter::updatedRandomData() memory[i] = i % 0xFF; } - // std::uniform_int_distribution<> dist(0, memory_size / 2 - 1); - // size_t gen_size = dist(gen); - buff_ptr = std::make_shared(memory.get(), approx_page_bytes); + std::uniform_int_distribution<> dist(buffer_size_min, buffer_size_max - 1); + size_t gen_size = dist(gen); + return std::make_shared(memory.get(), gen_size); } -void PSWriter::fillAllPages(const PSPtr & ps, size_t max_page_id) +void PSWriter::setBufferSizeRange(size_t min, size_t max) { - for (DB::PageId page_id = 0; page_id <= max_page_id; ++page_id) - { - std::unique_ptr holder; - DB::ReadBufferPtr buff = genRandomData(page_id, holder); - - DB::WriteBatch wb{DB::TEST_NAMESPACE_ID}; - wb.putPage(page_id, 0, buff, buff->buffer().size()); - ps->write(std::move(wb)); - if (page_id % 100 == 0) - LOG_INFO(StressEnv::logger, "writer wrote page {}", page_id); - } + RUNTIME_CHECK(min >= 1); + RUNTIME_CHECK(max >= min); + buffer_size_min = min; + buffer_size_max = max; + + if (buffer_size_max - buffer_size_min >= 4096) + LOG_WARNING(StressEnv::logger, "The result maybe not stable, min_size={} max_size={}", min, max); } -bool PSWriter::runImpl() +void PSWriter::write(const RandomPageId & r) { - const auto r = genRandomPageId(); - updatedRandomData(); // update buff_ptr + auto buff_ptr = updatedRandomData(); // update buff_ptr DB::WriteBatch wb{DB::TEST_NAMESPACE_ID}; wb.putPage(r.page_id, 0, buff_ptr, buff_ptr->buffer().size()); @@ -150,6 +122,11 @@ bool PSWriter::runImpl() // LOG_TRACE(StressEnv::logger, "write done, page_id={}, remove={}", r.page_id, r.page_id_to_remove); global_stat->commit(r); +} + +bool PSWriter::runImpl() +{ + write(genRandomPageId()); return true; } @@ -160,11 +137,10 @@ RandomPageId PSWriter::genRandomPageId() return RandomPageId(static_cast(std::round(dist(gen)))); } -void PSCommonWriter::updatedRandomData() +DB::ReadBufferPtr PSCommonWriter::updatedRandomData() { // Calculate the fixed memory size - size_t single_buff_size = ((buffer_size_min <= buffer_size_max && buffer_size_max > 0) ? buffer_size_max - : batch_buffer_size); + size_t single_buff_size = buffer_size_max; size_t memory_size = single_buff_size * batch_buffer_nums; if (memory == nullptr) @@ -174,37 +150,47 @@ void PSCommonWriter::updatedRandomData() memory[i] = i % 0xFF; } - buff_ptrs.clear(); - - size_t gen_size = genBufferSize(); - for (size_t i = 0; i < batch_buffer_nums; ++i) - { - buff_ptrs.emplace_back(std::make_shared(memory.get() + i * single_buff_size, gen_size)); - } + std::uniform_int_distribution<> dist(buffer_size_min, buffer_size_max); + size_t gen_size = dist(gen); + return std::make_shared(memory.get(), gen_size); } bool PSCommonWriter::runImpl() { const auto r = genRandomPageId(); - DB::WriteBatch wb{DB::TEST_NAMESPACE_ID}; - updatedRandomData(); - + size_t page_write = 0; + size_t bytes_write = 0; // FIXME: update one page_id by multiple data in one write batch? - for (auto & buffptr : buff_ptrs) + DB::WriteBatch wb{DB::TEST_NAMESPACE_ID}; + for (size_t i = 0; i < batch_buffer_nums; ++i) { - wb.putPage(r.page_id, 0, buffptr, buffptr->buffer().size()); - ++pages_used; - bytes_used += buffptr->buffer().size(); + auto buff_ptr = updatedRandomData(); + if (data_sizes.empty()) + { + wb.putPage(r.page_id, 0, buff_ptr, buff_ptr->buffer().size()); + } + else + { + // mock test for wide table that store many in-page-offsets + wb.putPage(r.page_id, 0, buff_ptr, buff_ptr->buffer().size(), data_sizes); + } + page_write += 1; + bytes_write += buff_ptr->buffer().size(); } for (const auto & page_id : r.page_id_to_remove) wb.delPage(page_id); ps->write(std::move(wb)); + + pages_used += page_write; + bytes_used += bytes_write; + // verbose logging for debug // LOG_TRACE(StressEnv::logger, "write done, page_id={}, remove={}", r.page_id, r.page_id_to_remove); global_stat->commit(r); - return (batch_buffer_limit == 0 || bytes_used < batch_buffer_limit); + bool keep_running = (batch_buffer_limit == 0 || bytes_used < batch_buffer_limit); + return keep_running; } void PSCommonWriter::setBatchBufferNums(size_t numbers) @@ -212,11 +198,6 @@ void PSCommonWriter::setBatchBufferNums(size_t numbers) batch_buffer_nums = numbers; } -void PSCommonWriter::setBatchBufferSize(size_t size) -{ - batch_buffer_size = size; -} - void PSCommonWriter::setBatchBufferLimit(size_t size_limit) { batch_buffer_limit = size_limit; @@ -227,35 +208,11 @@ void PSCommonWriter::setBatchBufferPageRange(size_t max_page_id_) max_page_id = max_page_id_; } -RandomPageId PSCommonWriter::genRandomPageId() -{ - std::uniform_int_distribution<> dist(0, max_page_id); - return RandomPageId(static_cast(dist(gen))); -} - -void PSCommonWriter::setBatchBufferRange(size_t min, size_t max) -{ - RUNTIME_CHECK(max >= min); - buffer_size_min = std::max(1, min); - buffer_size_max = max; -} - void PSCommonWriter::setFieldSize(const DB::PageFieldSizes & data_sizes_) { data_sizes = data_sizes_; } -size_t PSCommonWriter::genBufferSize() -{ - // If set min/max size set, use the range. Otherwise, use batch_buffer_size. - if (buffer_size_min <= buffer_size_max && buffer_size_max > 0) - { - std::uniform_int_distribution<> dist(buffer_size_min, buffer_size_max); - return dist(gen); - } - return batch_buffer_size; -} - /// /// Reader /// @@ -305,6 +262,10 @@ void PSReader::setReadPageNums(size_t page_read_once_) num_pages_read = page_read_once_; } +/// +/// WindowWriter +/// + void PSWindowWriter::setNormalDistributionSigma(size_t sigma_) { sigma = sigma_; @@ -357,6 +318,10 @@ RandomPageId PSWindowWriter::genRandomPageId() return RandomPageId(page_id, ids_to_del); } +/// +/// WindowReader +/// + void PSWindowReader::setNormalDistributionSigma(size_t sigma_) { sigma = sigma_; diff --git a/dbms/src/Storages/Page/workload/PSRunnable.h b/dbms/src/Storages/Page/workload/PSRunnable.h index 93b858e7070..2420ec14956 100644 --- a/dbms/src/Storages/Page/workload/PSRunnable.h +++ b/dbms/src/Storages/Page/workload/PSRunnable.h @@ -68,8 +68,6 @@ struct GlobalStat class PSWriter : public PSRunnable { - static size_t approx_page_bytes; - public: PSWriter(const PSPtr & ps_, DB::UInt32 index_, const std::unique_ptr & global_stat_) : ps(ps_) @@ -89,16 +87,14 @@ class PSWriter : public PSRunnable return fmt::format("(Stress Test Writer {})", index); } - static void setApproxPageSize(size_t bytes); - - static DB::ReadBufferPtr genRandomData(DB::PageId page_id, std::unique_ptr & p); - - virtual void updatedRandomData(); + void setBufferSizeRange(size_t min, size_t max); - static void fillAllPages(const PSPtr & ps, size_t max_page_id); + virtual DB::ReadBufferPtr updatedRandomData(); bool runImpl() override; + void write(const RandomPageId & r); + protected: virtual RandomPageId genRandomPageId(); @@ -108,7 +104,10 @@ class PSWriter : public PSRunnable std::mt19937 gen; DB::PageId max_page_id = MAX_PAGE_ID_DEFAULT; std::unique_ptr memory; - DB::ReadBufferPtr buff_ptr; + + size_t buffer_size_min = 1 * 1024 * 1024; + size_t buffer_size_max = 1 * 1024 * 1024; + const std::unique_ptr & global_stat; }; @@ -122,7 +121,7 @@ class PSCommonWriter : public PSWriter : PSWriter(ps_, index_, global_stat_) {} - void updatedRandomData() override; + DB::ReadBufferPtr updatedRandomData() override; String description() override { return fmt::format("(Stress Test Common Writer {})", index); } @@ -130,29 +129,17 @@ class PSCommonWriter : public PSWriter void setBatchBufferNums(size_t numbers); - void setBatchBufferSize(size_t size); - void setBatchBufferLimit(size_t size_limit); void setBatchBufferPageRange(size_t max_page_id_); - void setBatchBufferRange(size_t min, size_t max); - void setFieldSize(const DB::PageFieldSizes & data_sizes); protected: - std::vector buff_ptrs; size_t batch_buffer_nums = 100; - size_t batch_buffer_size = 1 * DB::MB; size_t batch_buffer_limit = 0; - size_t buffer_size_min = 0; - size_t buffer_size_max = 0; - DB::PageFieldSizes data_sizes = {}; - - RandomPageId genRandomPageId() override; - virtual size_t genBufferSize(); }; // PSWindowsWriter can better simulate the user's workload in cooperation with PSWindowsReader diff --git a/dbms/src/Storages/Page/workload/PSWorkload.cpp b/dbms/src/Storages/Page/workload/PSWorkload.cpp index 28ebac753f2..1351e95adbd 100644 --- a/dbms/src/Storages/Page/workload/PSWorkload.cpp +++ b/dbms/src/Storages/Page/workload/PSWorkload.cpp @@ -22,6 +22,14 @@ #include +#include "Storages/Page/workload/PSRunnable.h" + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-parameter" +#include +#include +#pragma GCC diagnostic pop + namespace DB::PS::tests { void StressWorkload::onDumpResult() @@ -30,23 +38,45 @@ void StressWorkload::onDumpResult() LOG_INFO(options.logger, "result in {}ms", time_interval); double seconds_run = 1.0 * time_interval / 1000; + Poco::JSON::Object::Ptr details = new Poco::JSON::Object(); + size_t total_pages_written = 0; size_t total_bytes_written = 0; + Poco::JSON::Array::Ptr json_writers(new Poco::JSON::Array()); for (auto & writer : writers) { total_pages_written += writer->pages_used; total_bytes_written += writer->bytes_used; + + Poco::JSON::Object::Ptr json_writer = new Poco::JSON::Object(); + json_writer->set("pages", writer->pages_used); + json_writer->set("bytes", writer->bytes_used); + json_writers->add(json_writer); } + details->set("writers", json_writers); size_t total_pages_read = 0; size_t total_bytes_read = 0; + Poco::JSON::Array::Ptr json_readers(new Poco::JSON::Array()); for (auto & reader : readers) { total_pages_read += reader->pages_used; total_bytes_read += reader->bytes_used; + + Poco::JSON::Object::Ptr json_reader = new Poco::JSON::Object(); + json_reader->set("pages", reader->pages_used); + json_reader->set("bytes", reader->bytes_used); + json_readers->add(json_reader); } + details->set("readers", json_readers); + + LOG_INFO(options.logger, "{}", [&]() { + std::stringstream ss; + details->stringify(ss); + return ss.str(); + }()); LOG_INFO(options.logger, "W: {} pages, {:.4f} GB, {:.4f} GB/s", @@ -113,6 +143,18 @@ void StressWorkload::initPageStorage(DB::PageStorageConfig & config, String path runtime_stat = std::make_unique(); } +void StressWorkload::initPages(const DB::PageId & max_page_id) +{ + auto writer = std::make_shared(ps, 0, runtime_stat); + for (DB::PageId page_id = 0; page_id <= max_page_id; ++page_id) + { + RandomPageId r(page_id); + writer->write(r); + if (page_id % 100 == 0) + LOG_INFO(StressEnv::logger, "writer wrote page {}", page_id); + } +} + void StressWorkload::startBackgroundTimer() { // A background thread that do GC diff --git a/dbms/src/Storages/Page/workload/PSWorkload.h b/dbms/src/Storages/Page/workload/PSWorkload.h index 5824884e20c..35248b6188b 100644 --- a/dbms/src/Storages/Page/workload/PSWorkload.h +++ b/dbms/src/Storages/Page/workload/PSWorkload.h @@ -83,6 +83,8 @@ class StressWorkload void startBackgroundTimer(); + void initPages(const DB::PageId & max_page_id); + template void startWriter(size_t nums_writers, std::function)> writer_configure = nullptr) { diff --git a/dbms/src/Storages/Page/workload/PageStorageInMemoryCapacity.h b/dbms/src/Storages/Page/workload/PageStorageInMemoryCapacity.h index 15372c4076c..3716aa6c916 100644 --- a/dbms/src/Storages/Page/workload/PageStorageInMemoryCapacity.h +++ b/dbms/src/Storages/Page/workload/PageStorageInMemoryCapacity.h @@ -146,7 +146,7 @@ class PageStorageInMemoryCapacity : public StressWorkload stop_watch.start(); startWriter(options.num_writers, [](std::shared_ptr writer) -> void { writer->setBatchBufferNums(1); - writer->setBatchBufferSize(10 * 1024); + writer->setBufferSizeRange(10ULL * DB::MB, 10ULL * DB::MB); writer->setPageRange(single_writer_page_nums); }); diff --git a/dbms/src/Storages/Page/workload/ThousandsOfOffset.h b/dbms/src/Storages/Page/workload/ThousandsOfOffset.h index 13ca78a56b7..5c4d9cf5cc0 100644 --- a/dbms/src/Storages/Page/workload/ThousandsOfOffset.h +++ b/dbms/src/Storages/Page/workload/ThousandsOfOffset.h @@ -88,7 +88,7 @@ class ThousandsOfOffset : public StressWorkload startWriter(options.num_writers, [field_size, buffer_size](std::shared_ptr writer) -> void { writer->setFieldSize(field_size); writer->setBatchBufferNums(1); - writer->setBatchBufferSize(buffer_size); + writer->setBufferSizeRange(buffer_size, buffer_size); writer->setNormalDistributionSigma(13); }); @@ -109,7 +109,7 @@ class ThousandsOfOffset : public StressWorkload startWriter(options.num_writers, [field_size, buffer_size](std::shared_ptr writer) -> void { writer->setFieldSize(field_size); writer->setBatchBufferNums(20); - writer->setBatchBufferSize(buffer_size); + writer->setBufferSizeRange(buffer_size, buffer_size); writer->setNormalDistributionSigma(13); }); @@ -130,7 +130,7 @@ class ThousandsOfOffset : public StressWorkload startWriter(options.num_writers, [field_size, buffer_size](std::shared_ptr writer) -> void { writer->setFieldSize(field_size); writer->setBatchBufferNums(1); - writer->setBatchBufferSize(buffer_size); + writer->setBufferSizeRange(buffer_size, buffer_size); writer->setNormalDistributionSigma(13); }); @@ -151,7 +151,7 @@ class ThousandsOfOffset : public StressWorkload startWriter(options.num_writers, [field_size, buffer_size](std::shared_ptr writer) -> void { writer->setFieldSize(field_size); writer->setBatchBufferNums(20); - writer->setBatchBufferSize(buffer_size); + writer->setBufferSizeRange(buffer_size, buffer_size); writer->setNormalDistributionSigma(13); }); From d4a3d6d447b26c91255492a81935e576bc65d6d6 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Sat, 29 Oct 2022 14:40:41 +0800 Subject: [PATCH 07/12] Rename func --- dbms/src/Storages/Page/workload/PSRunnable.cpp | 15 +++++++-------- dbms/src/Storages/Page/workload/PSRunnable.h | 4 ++-- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/dbms/src/Storages/Page/workload/PSRunnable.cpp b/dbms/src/Storages/Page/workload/PSRunnable.cpp index de0c3b5079c..591063d43cb 100644 --- a/dbms/src/Storages/Page/workload/PSRunnable.cpp +++ b/dbms/src/Storages/Page/workload/PSRunnable.cpp @@ -79,17 +79,16 @@ size_t PSRunnable::getPagesUsed() const /// Writer /// -DB::ReadBufferPtr PSWriter::updatedRandomData() +DB::ReadBufferPtr PSWriter::getRandomData() { - size_t memory_size = buffer_size_max; if (memory == nullptr) { - memory = std::unique_ptr(new char[memory_size]); - for (size_t i = 0; i < memory_size; i++) + memory = std::unique_ptr(new char[buffer_size_max + 1]); + for (size_t i = 0; i < buffer_size_max + 1; i++) memory[i] = i % 0xFF; } - std::uniform_int_distribution<> dist(buffer_size_min, buffer_size_max - 1); + std::uniform_int_distribution<> dist(buffer_size_min, buffer_size_max); size_t gen_size = dist(gen); return std::make_shared(memory.get(), gen_size); } @@ -107,7 +106,7 @@ void PSWriter::setBufferSizeRange(size_t min, size_t max) void PSWriter::write(const RandomPageId & r) { - auto buff_ptr = updatedRandomData(); // update buff_ptr + auto buff_ptr = getRandomData(); DB::WriteBatch wb{DB::TEST_NAMESPACE_ID}; wb.putPage(r.page_id, 0, buff_ptr, buff_ptr->buffer().size()); @@ -137,7 +136,7 @@ RandomPageId PSWriter::genRandomPageId() return RandomPageId(static_cast(std::round(dist(gen)))); } -DB::ReadBufferPtr PSCommonWriter::updatedRandomData() +DB::ReadBufferPtr PSCommonWriter::getRandomData() { // Calculate the fixed memory size size_t single_buff_size = buffer_size_max; @@ -165,7 +164,7 @@ bool PSCommonWriter::runImpl() DB::WriteBatch wb{DB::TEST_NAMESPACE_ID}; for (size_t i = 0; i < batch_buffer_nums; ++i) { - auto buff_ptr = updatedRandomData(); + auto buff_ptr = getRandomData(); if (data_sizes.empty()) { wb.putPage(r.page_id, 0, buff_ptr, buff_ptr->buffer().size()); diff --git a/dbms/src/Storages/Page/workload/PSRunnable.h b/dbms/src/Storages/Page/workload/PSRunnable.h index 2420ec14956..b1e6f19ecd4 100644 --- a/dbms/src/Storages/Page/workload/PSRunnable.h +++ b/dbms/src/Storages/Page/workload/PSRunnable.h @@ -89,7 +89,7 @@ class PSWriter : public PSRunnable void setBufferSizeRange(size_t min, size_t max); - virtual DB::ReadBufferPtr updatedRandomData(); + virtual DB::ReadBufferPtr getRandomData(); bool runImpl() override; @@ -121,7 +121,7 @@ class PSCommonWriter : public PSWriter : PSWriter(ps_, index_, global_stat_) {} - DB::ReadBufferPtr updatedRandomData() override; + DB::ReadBufferPtr getRandomData() override; String description() override { return fmt::format("(Stress Test Common Writer {})", index); } From 613236f20936d902e1697dfd1cd10b5058fb7c97 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Sun, 30 Oct 2022 16:17:25 +0800 Subject: [PATCH 08/12] Remove useless code --- dbms/src/Storages/Page/workload/Normal.h | 6 +----- dbms/src/Storages/Page/workload/PSStressEnv.cpp | 2 -- dbms/src/Storages/Page/workload/PSStressEnv.h | 4 +--- dbms/src/Storages/Page/workload/PSWorkload.cpp | 7 ++----- 4 files changed, 4 insertions(+), 15 deletions(-) diff --git a/dbms/src/Storages/Page/workload/Normal.h b/dbms/src/Storages/Page/workload/Normal.h index 05112a934d1..6443240871d 100644 --- a/dbms/src/Storages/Page/workload/Normal.h +++ b/dbms/src/Storages/Page/workload/Normal.h @@ -51,15 +51,11 @@ class NormalWorkload initPageStorage(config); // init all pages in PageStorage - if (options.init_pages || options.just_init_pages) + if (options.init_pages) { static constexpr PageId MAX_PAGE_ID_DEFAULT = 1000; initPages(MAX_PAGE_ID_DEFAULT); LOG_INFO(StressEnv::logger, "All pages have been init."); - if (options.just_init_pages) - { - return; - } } stop_watch.start(); diff --git a/dbms/src/Storages/Page/workload/PSStressEnv.cpp b/dbms/src/Storages/Page/workload/PSStressEnv.cpp index 6a24b06759a..1561a2d135c 100644 --- a/dbms/src/Storages/Page/workload/PSStressEnv.cpp +++ b/dbms/src/Storages/Page/workload/PSStressEnv.cpp @@ -52,7 +52,6 @@ StressEnv StressEnv::parse(int argc, char ** argv) ("read_concurrency,R", value()->default_value(16), "number of read threads") // ("clean_before_run,C", value()->default_value(false), "drop data before running") // ("init_pages,I", value()->default_value(false), "init pages if not exist before running") // - ("just_init_pages,J", value()->default_value(false), "Only init pages 0 - 1000.Then quit") // ("timeout,T", value()->default_value(600), "maximum run time (seconds). 0 means run infinitely") // ("writer_slots", value()->default_value(4), "number of PageStorage writer slots") // ("read_delay_ms", value()->default_value(0), "millionseconds of read delay") // @@ -80,7 +79,6 @@ StressEnv StressEnv::parse(int argc, char ** argv) opt.num_writers = options["write_concurrency"].as(); opt.num_readers = options["read_concurrency"].as(); opt.init_pages = options["init_pages"].as(); - opt.just_init_pages = options["just_init_pages"].as(); opt.clean_before_run = options["clean_before_run"].as(); opt.timeout_s = options["timeout"].as(); opt.read_delay_ms = options["read_delay_ms"].as(); diff --git a/dbms/src/Storages/Page/workload/PSStressEnv.h b/dbms/src/Storages/Page/workload/PSStressEnv.h index c015d23f6ea..3af84fbdabf 100644 --- a/dbms/src/Storages/Page/workload/PSStressEnv.h +++ b/dbms/src/Storages/Page/workload/PSStressEnv.h @@ -77,7 +77,6 @@ struct StressEnv size_t num_writers = 1; size_t num_readers = 4; bool init_pages = false; - bool just_init_pages = false; bool clean_before_run = false; size_t gc_interval_s = 30; size_t timeout_s = 0; @@ -96,7 +95,7 @@ struct StressEnv { return fmt::format( "{{ " - "num_writers: {}, num_readers: {}, init_pages: {}, just_init_pages: {}" + "num_writers: {}, num_readers: {}, init_pages: {}" ", clean_before_run: {}, timeout_s: {}, read_delay_ms: {}, num_writer_slots: {}" ", avg_page_size: {}, paths: [{}], failpoints: [{}]" ", gc_interval_s: {}" @@ -107,7 +106,6 @@ struct StressEnv num_writers, num_readers, init_pages, - just_init_pages, clean_before_run, timeout_s, read_delay_ms, diff --git a/dbms/src/Storages/Page/workload/PSWorkload.cpp b/dbms/src/Storages/Page/workload/PSWorkload.cpp index 1351e95adbd..06ff86865a7 100644 --- a/dbms/src/Storages/Page/workload/PSWorkload.cpp +++ b/dbms/src/Storages/Page/workload/PSWorkload.cpp @@ -187,7 +187,7 @@ void StressWorkload::startBackgroundTimer() void PageWorkloadFactory::runWorkload() { - if (options.just_init_pages || options.situation_mask == NORMAL_WORKLOAD) + if (options.situation_mask == NORMAL_WORKLOAD) { String name; WorkloadCreator func; @@ -195,10 +195,7 @@ void PageWorkloadFactory::runWorkload() running_workload = std::shared_ptr(func(options)); LOG_INFO(StressEnv::logger, "Start Running {}, {}", name, running_workload->desc()); running_workload->run(); - if (!options.just_init_pages) - { - running_workload->onDumpResult(); - } + running_workload->onDumpResult(); return; } From ba675a6d1c23f3eda33938b60926fa2fc292ef9e Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Mon, 31 Oct 2022 11:56:15 +0800 Subject: [PATCH 09/12] dropdata default true --- dbms/src/Storages/Page/workload/PSStressEnv.cpp | 14 +++++++------- dbms/src/Storages/Page/workload/PSStressEnv.h | 6 +++--- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/dbms/src/Storages/Page/workload/PSStressEnv.cpp b/dbms/src/Storages/Page/workload/PSStressEnv.cpp index 1561a2d135c..ba6b83df61d 100644 --- a/dbms/src/Storages/Page/workload/PSStressEnv.cpp +++ b/dbms/src/Storages/Page/workload/PSStressEnv.cpp @@ -50,10 +50,10 @@ StressEnv StressEnv::parse(int argc, char ** argv) desc.add_options()("help,h", "produce help message") // ("write_concurrency,W", value()->default_value(4), "number of write threads") // ("read_concurrency,R", value()->default_value(16), "number of read threads") // - ("clean_before_run,C", value()->default_value(false), "drop data before running") // - ("init_pages,I", value()->default_value(false), "init pages if not exist before running") // + ("dropdata", value()->default_value(true), "drop data before running") // + ("init_pages", value()->default_value(false), "init pages if not exist before running") // ("timeout,T", value()->default_value(600), "maximum run time (seconds). 0 means run infinitely") // - ("writer_slots", value()->default_value(4), "number of PageStorage writer slots") // + ("writer_slots", value()->default_value(4), "number of PageStorage writer slots (for V2)") // ("read_delay_ms", value()->default_value(0), "millionseconds of read delay") // ("avg_page_size", value()->default_value(1 * 1024 * 1024), "avg size for each page(bytes). 1MiB by default") // ("paths,P", value>(), "store path(s)") // @@ -79,7 +79,7 @@ StressEnv StressEnv::parse(int argc, char ** argv) opt.num_writers = options["write_concurrency"].as(); opt.num_readers = options["read_concurrency"].as(); opt.init_pages = options["init_pages"].as(); - opt.clean_before_run = options["clean_before_run"].as(); + opt.dropdata = options["dropdata"].as(); opt.timeout_s = options["timeout"].as(); opt.read_delay_ms = options["read_delay_ms"].as(); opt.num_writer_slots = options["writer_slots"].as(); @@ -138,17 +138,17 @@ void StressEnv::setup() if (Poco::File file(path); file.exists()) { all_directories_not_exist = false; - if (clean_before_run) + if (dropdata) { file.remove(true); } } } - if (clean_before_run) + if (dropdata) LOG_INFO(StressEnv::logger, "All pages have been drop."); - if (clean_before_run || all_directories_not_exist) + if (dropdata || all_directories_not_exist) init_pages = true; setupSignal(); } diff --git a/dbms/src/Storages/Page/workload/PSStressEnv.h b/dbms/src/Storages/Page/workload/PSStressEnv.h index 3af84fbdabf..584069279ef 100644 --- a/dbms/src/Storages/Page/workload/PSStressEnv.h +++ b/dbms/src/Storages/Page/workload/PSStressEnv.h @@ -77,7 +77,7 @@ struct StressEnv size_t num_writers = 1; size_t num_readers = 4; bool init_pages = false; - bool clean_before_run = false; + bool dropdata = false; size_t gc_interval_s = 30; size_t timeout_s = 0; size_t read_delay_ms = 0; @@ -96,7 +96,7 @@ struct StressEnv return fmt::format( "{{ " "num_writers: {}, num_readers: {}, init_pages: {}" - ", clean_before_run: {}, timeout_s: {}, read_delay_ms: {}, num_writer_slots: {}" + ", dropdata: {}, timeout_s: {}, read_delay_ms: {}, num_writer_slots: {}" ", avg_page_size: {}, paths: [{}], failpoints: [{}]" ", gc_interval_s: {}" ", status_interval: {}, verify: {}" @@ -106,7 +106,7 @@ struct StressEnv num_writers, num_readers, init_pages, - clean_before_run, + dropdata, timeout_s, read_delay_ms, num_writer_slots, From 328d355765367d8ee2199cbbc4d818f3fe0bc708 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Mon, 7 Nov 2022 17:28:25 +0800 Subject: [PATCH 10/12] Format files --- dbms/src/Storages/Page/workload/HeavyRead.h | 3 +-- dbms/src/Storages/Page/workload/Normal.h | 3 +-- dbms/src/Storages/Page/workload/PSStressEnv.cpp | 2 +- dbms/src/Storages/Page/workload/PSWorkload.cpp | 3 +-- dbms/src/Storages/Page/workload/PageStorageInMemoryCapacity.h | 2 +- 5 files changed, 5 insertions(+), 8 deletions(-) diff --git a/dbms/src/Storages/Page/workload/HeavyRead.h b/dbms/src/Storages/Page/workload/HeavyRead.h index 76893f5cf94..4f0c15dda46 100644 --- a/dbms/src/Storages/Page/workload/HeavyRead.h +++ b/dbms/src/Storages/Page/workload/HeavyRead.h @@ -12,10 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include -#include "Storages/Page/workload/PSRunnable.h" - namespace DB::PS::tests { class HeavyRead : public StressWorkload diff --git a/dbms/src/Storages/Page/workload/Normal.h b/dbms/src/Storages/Page/workload/Normal.h index 6443240871d..9d24ae14864 100644 --- a/dbms/src/Storages/Page/workload/Normal.h +++ b/dbms/src/Storages/Page/workload/Normal.h @@ -12,10 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include -#include "Storages/Page/workload/PSRunnable.h" - namespace DB::PS::tests { class NormalWorkload diff --git a/dbms/src/Storages/Page/workload/PSStressEnv.cpp b/dbms/src/Storages/Page/workload/PSStressEnv.cpp index ba6b83df61d..5d3a62fd112 100644 --- a/dbms/src/Storages/Page/workload/PSStressEnv.cpp +++ b/dbms/src/Storages/Page/workload/PSStressEnv.cpp @@ -59,7 +59,7 @@ StressEnv StressEnv::parse(int argc, char ** argv) ("paths,P", value>(), "store path(s)") // ("failpoints", value>(), "failpoint(s) to enable") // ("gc_interval", value()->default_value(30), "GC interval(seconds). 0 means no gc") // - ("status_interval", value()->default_value(1), "Status statistics interval(seconds). 0 means no statistics") // + ("status_interval", value()->default_value(5), "Status statistics interval(seconds). 0 means no statistics") // ("situation_mask,M", value()->default_value(0), "Run special tests sequentially, example -M 2") // ("verify", value()->default_value(true), "Run special tests sequentially with verify.") // ("running_ps_version,V", value()->default_value(3), "Select a version of PageStorage. 2 or 3 can used"); diff --git a/dbms/src/Storages/Page/workload/PSWorkload.cpp b/dbms/src/Storages/Page/workload/PSWorkload.cpp index 06ff86865a7..009106621f7 100644 --- a/dbms/src/Storages/Page/workload/PSWorkload.cpp +++ b/dbms/src/Storages/Page/workload/PSWorkload.cpp @@ -17,13 +17,12 @@ #include #include #include +#include #include #include #include -#include "Storages/Page/workload/PSRunnable.h" - #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wunused-parameter" #include diff --git a/dbms/src/Storages/Page/workload/PageStorageInMemoryCapacity.h b/dbms/src/Storages/Page/workload/PageStorageInMemoryCapacity.h index 3716aa6c916..5c161456a7b 100644 --- a/dbms/src/Storages/Page/workload/PageStorageInMemoryCapacity.h +++ b/dbms/src/Storages/Page/workload/PageStorageInMemoryCapacity.h @@ -146,7 +146,7 @@ class PageStorageInMemoryCapacity : public StressWorkload stop_watch.start(); startWriter(options.num_writers, [](std::shared_ptr writer) -> void { writer->setBatchBufferNums(1); - writer->setBufferSizeRange(10ULL * DB::MB, 10ULL * DB::MB); + writer->setBufferSizeRange(10ULL * 1024, 10ULL * 1024); writer->setPageRange(single_writer_page_nums); }); From 7e3fabf9d33690b2b5e0fba72632b2f004db5b73 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Mon, 7 Nov 2022 18:27:50 +0800 Subject: [PATCH 11/12] Use startBackgroundTimer instead --- dbms/src/Storages/Page/V3/PageDirectory.cpp | 1 + .../Page/workload/HeavyMemoryCostInGC.h | 6 +---- dbms/src/Storages/Page/workload/HeavyRead.h | 6 +---- .../Page/workload/HeavySkewWriteRead.h | 4 --- dbms/src/Storages/Page/workload/HeavyWrite.h | 6 +---- dbms/src/Storages/Page/workload/PSRunnable.h | 4 +++ dbms/src/Storages/Page/workload/PSStressEnv.h | 2 +- .../workload/PageStorageInMemoryCapacity.h | 27 +++++++++---------- 8 files changed, 21 insertions(+), 35 deletions(-) diff --git a/dbms/src/Storages/Page/V3/PageDirectory.cpp b/dbms/src/Storages/Page/V3/PageDirectory.cpp index b11228fb687..40ee1b2a63b 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectory.cpp @@ -1129,6 +1129,7 @@ std::set PageDirectory::getAllPageIds() const auto seq = sequence.load(); for (auto & [page_id, versioned] : mvcc_table_directory) { + // Only return the page_id that is visible if (versioned->isVisible(seq)) page_ids.insert(page_id); } diff --git a/dbms/src/Storages/Page/workload/HeavyMemoryCostInGC.h b/dbms/src/Storages/Page/workload/HeavyMemoryCostInGC.h index 8dd68dfed6b..497aef06340 100644 --- a/dbms/src/Storages/Page/workload/HeavyMemoryCostInGC.h +++ b/dbms/src/Storages/Page/workload/HeavyMemoryCostInGC.h @@ -52,11 +52,7 @@ class HeavyMemoryCostInGC DB::PageStorageConfig config; initPageStorage(config, name()); - metrics_dumper = std::make_shared(1); - metrics_dumper->start(); - - stress_time = std::make_shared(options.timeout_s); - stress_time->start(); + startBackgroundTimer(); startWriter(options.num_writers, [](std::shared_ptr writer) -> void { writer->setBatchBufferNums(100); diff --git a/dbms/src/Storages/Page/workload/HeavyRead.h b/dbms/src/Storages/Page/workload/HeavyRead.h index 4f0c15dda46..c45c00e7c68 100644 --- a/dbms/src/Storages/Page/workload/HeavyRead.h +++ b/dbms/src/Storages/Page/workload/HeavyRead.h @@ -53,11 +53,7 @@ class HeavyRead : public StressWorkload initPageStorage(config, name()); initPages(MAX_PAGE_ID_DEFAULT); - metrics_dumper = std::make_shared(1); - metrics_dumper->start(); - - stress_time = std::make_shared(60); - stress_time->start(); + startBackgroundTimer(); { stop_watch.start(); startReader(options.num_readers, [](std::shared_ptr reader) -> void { diff --git a/dbms/src/Storages/Page/workload/HeavySkewWriteRead.h b/dbms/src/Storages/Page/workload/HeavySkewWriteRead.h index 8fe8537ec4f..1edca02c1e6 100644 --- a/dbms/src/Storages/Page/workload/HeavySkewWriteRead.h +++ b/dbms/src/Storages/Page/workload/HeavySkewWriteRead.h @@ -72,10 +72,6 @@ class HeavySkewWriteRead : public StressWorkload pool.joinAll(); stop_watch.stop(); } - - // ps->traverse([](const DB::Page & page) { - // LOG_INFO(StressEnv::logger, "page_id={}", page.page_id); - // }); } bool verify() override diff --git a/dbms/src/Storages/Page/workload/HeavyWrite.h b/dbms/src/Storages/Page/workload/HeavyWrite.h index 41aba5486f0..fec9906cafc 100644 --- a/dbms/src/Storages/Page/workload/HeavyWrite.h +++ b/dbms/src/Storages/Page/workload/HeavyWrite.h @@ -50,11 +50,7 @@ class HeavyWrite : public StressWorkload DB::PageStorageConfig config; initPageStorage(config, name()); - metrics_dumper = std::make_shared(1); - metrics_dumper->start(); - - stress_time = std::make_shared(60); - stress_time->start(); + startBackgroundTimer(); { stop_watch.start(); startWriter(options.num_writers, [](std::shared_ptr writer) -> void { diff --git a/dbms/src/Storages/Page/workload/PSRunnable.h b/dbms/src/Storages/Page/workload/PSRunnable.h index b1e6f19ecd4..72e2b81e46c 100644 --- a/dbms/src/Storages/Page/workload/PSRunnable.h +++ b/dbms/src/Storages/Page/workload/PSRunnable.h @@ -35,9 +35,12 @@ class PSRunnable : public Poco::Runnable size_t pages_used = 0; }; +// The random page id for adding and removing struct RandomPageId { + // The new page id to add to pagestorage DB::PageId page_id; + // The page ids to removed from pagestorage DB::PageIdSet page_id_to_remove; explicit RandomPageId(DB::PageId new_page_id) @@ -51,6 +54,7 @@ struct RandomPageId } }; +// The shared status inside workload struct GlobalStat { // shared status between PSWindowWriter and PSWindowReader diff --git a/dbms/src/Storages/Page/workload/PSStressEnv.h b/dbms/src/Storages/Page/workload/PSStressEnv.h index 584069279ef..b01bb0c1f32 100644 --- a/dbms/src/Storages/Page/workload/PSStressEnv.h +++ b/dbms/src/Storages/Page/workload/PSStressEnv.h @@ -83,7 +83,7 @@ struct StressEnv size_t read_delay_ms = 0; size_t num_writer_slots = 1; size_t avg_page_size = 1024 * 1024; - size_t status_interval = 1; + size_t status_interval = 5; size_t situation_mask = 0; bool verify = true; size_t running_ps_version = 3; diff --git a/dbms/src/Storages/Page/workload/PageStorageInMemoryCapacity.h b/dbms/src/Storages/Page/workload/PageStorageInMemoryCapacity.h index 5c161456a7b..632f99bcb5a 100644 --- a/dbms/src/Storages/Page/workload/PageStorageInMemoryCapacity.h +++ b/dbms/src/Storages/Page/workload/PageStorageInMemoryCapacity.h @@ -137,11 +137,8 @@ class PageStorageInMemoryCapacity : public StressWorkload DB::PageStorageConfig config; initPageStorage(config, name()); - metrics_dumper = std::make_shared(1); - metrics_dumper->start(); + startBackgroundTimer(); - stress_time = std::make_shared(100); - stress_time->start(); { stop_watch.start(); startWriter(options.num_writers, [](std::shared_ptr writer) -> void { @@ -161,17 +158,17 @@ class PageStorageInMemoryCapacity : public StressWorkload size_t page_writen = (single_writer_page_nums * options.num_writers); assert(page_writen != 0); - LOG_INFO(StressEnv::logger, fmt::format("After gen: {} pages" - "virtual memory used: {} MB," - "resident memory used: {} MB," - "total memory is {} , It is estimated that {} pages can be stored in the virtual memory," - "It is estimated that {} pages can be stored in the resident memory.", - page_writen, - virtual_used / DB::MB, - resident_used / DB::MB, - total_mem, - std::round(virtual_used) ? (total_mem / ((double)virtual_used / page_writen)) : 0, - std::round(resident_used) ? (total_mem / ((double)resident_used / page_writen)) : 0)); + LOG_INFO(StressEnv::logger, "After gen: {} pages" + "virtual memory used: {} MB," + "resident memory used: {} MB," + "total memory is {} , It is estimated that {} pages can be stored in the virtual memory," + "It is estimated that {} pages can be stored in the resident memory.", + page_writen, + virtual_used / DB::MB, + resident_used / DB::MB, + total_mem, + std::round(virtual_used) ? (total_mem / ((double)virtual_used / page_writen)) : 0, + std::round(resident_used) ? (total_mem / ((double)resident_used / page_writen)) : 0); } }; } // namespace DB::PS::tests From 173353759b23050f43bda51349525d7dde3483ab Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 10 Nov 2022 14:23:43 +0800 Subject: [PATCH 12/12] A small random range --- dbms/src/Storages/Page/workload/Normal.h | 6 +++++- dbms/src/Storages/Page/workload/PSStressEnv.cpp | 2 +- dbms/src/Storages/Page/workload/PSStressEnv.h | 2 +- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/dbms/src/Storages/Page/workload/Normal.h b/dbms/src/Storages/Page/workload/Normal.h index 9d24ae14864..c25352c30bc 100644 --- a/dbms/src/Storages/Page/workload/Normal.h +++ b/dbms/src/Storages/Page/workload/Normal.h @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include @@ -57,10 +58,13 @@ class NormalWorkload LOG_INFO(StressEnv::logger, "All pages have been init."); } + RUNTIME_CHECK(options.avg_page_size > 1000); + stop_watch.start(); startWriter(options.num_writers, [this](std::shared_ptr w) { - w->setBufferSizeRange(options.avg_page_size, options.avg_page_size); + // A small random range + w->setBufferSizeRange(options.avg_page_size - 1000 / 2, options.avg_page_size + 1000 / 2); }); const size_t read_delay_ms = options.read_delay_ms; startReader(options.num_readers, [read_delay_ms](std::shared_ptr reader) -> void { diff --git a/dbms/src/Storages/Page/workload/PSStressEnv.cpp b/dbms/src/Storages/Page/workload/PSStressEnv.cpp index 5d3a62fd112..49a651e7388 100644 --- a/dbms/src/Storages/Page/workload/PSStressEnv.cpp +++ b/dbms/src/Storages/Page/workload/PSStressEnv.cpp @@ -55,7 +55,7 @@ StressEnv StressEnv::parse(int argc, char ** argv) ("timeout,T", value()->default_value(600), "maximum run time (seconds). 0 means run infinitely") // ("writer_slots", value()->default_value(4), "number of PageStorage writer slots (for V2)") // ("read_delay_ms", value()->default_value(0), "millionseconds of read delay") // - ("avg_page_size", value()->default_value(1 * 1024 * 1024), "avg size for each page(bytes). 1MiB by default") // + ("avg_page_size", value()->default_value(2 * 1024 * 1024), "avg size for each page(bytes). 2 MiB by default") // ("paths,P", value>(), "store path(s)") // ("failpoints", value>(), "failpoint(s) to enable") // ("gc_interval", value()->default_value(30), "GC interval(seconds). 0 means no gc") // diff --git a/dbms/src/Storages/Page/workload/PSStressEnv.h b/dbms/src/Storages/Page/workload/PSStressEnv.h index b01bb0c1f32..3cb583a05af 100644 --- a/dbms/src/Storages/Page/workload/PSStressEnv.h +++ b/dbms/src/Storages/Page/workload/PSStressEnv.h @@ -82,7 +82,7 @@ struct StressEnv size_t timeout_s = 0; size_t read_delay_ms = 0; size_t num_writer_slots = 1; - size_t avg_page_size = 1024 * 1024; + size_t avg_page_size = 2 * 1024 * 1024; size_t status_interval = 5; size_t situation_mask = 0; bool verify = true;