Skip to content
8 changes: 5 additions & 3 deletions dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1126,12 +1126,14 @@ PageId PageDirectory::getMaxId() const
std::set<PageIdV3Internal> PageDirectory::getAllPageIds()
{
std::set<PageIdV3Internal> page_ids;
std::shared_lock read_lock(table_rw_mutex);

std::shared_lock read_lock(table_rw_mutex);
const auto seq = sequence.load();
for (auto & [page_id, versioned] : mvcc_table_directory)
{
(void)versioned;
page_ids.insert(page_id);
// Only return the page_id that is visible
if (versioned->isVisible(seq))
page_ids.insert(page_id);
}
return page_ids;
}
Expand Down
14 changes: 6 additions & 8 deletions dbms/src/Storages/Page/workload/HeavyMemoryCostInGC.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,21 +52,17 @@ class HeavyMemoryCostInGC
DB::PageStorageConfig config;
initPageStorage(config, name());

metrics_dumper = std::make_shared<PSMetricsDumper>(1);
metrics_dumper->start();

stress_time = std::make_shared<StressTimeout>(30);
stress_time->start();
startBackgroundTimer();

startWriter<PSCommonWriter>(options.num_writers, [](std::shared_ptr<PSCommonWriter> writer) -> void {
writer->setBatchBufferNums(100);
writer->setBatchBufferSize(1);
writer->setBufferSizeRange(1, 1);
});

pool.joinAll();
stop_watch.stop();

gc = std::make_shared<PSGc>(ps);
gc = std::make_shared<PSGc>(ps, options.gc_interval_s);
gc->doGcOnce();
}

Expand All @@ -78,7 +74,9 @@ class HeavyMemoryCostInGC
void onFailed() override
{
LOG_WARNING(StressEnv::logger,
fmt::format("Memory Peak is {} , it should not bigger than {} ", metrics_dumper->getMemoryPeak(), 5 * 1024 * 1024));
"Memory Peak is {}, it should not bigger than {}",
metrics_dumper->getMemoryPeak(),
5 * 1024 * 1024);
}
};
} // namespace DB::PS::tests
9 changes: 3 additions & 6 deletions dbms/src/Storages/Page/workload/HeavyRead.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Storages/Page/workload/PSRunnable.h>
#include <Storages/Page/workload/PSWorkload.h>

namespace DB::PS::tests
Expand Down Expand Up @@ -50,13 +51,9 @@ class HeavyRead : public StressWorkload
{
DB::PageStorageConfig config;
initPageStorage(config, name());
PSWriter::fillAllPages(ps);
initPages(MAX_PAGE_ID_DEFAULT);

metrics_dumper = std::make_shared<PSMetricsDumper>(1);
metrics_dumper->start();

stress_time = std::make_shared<StressTimeout>(60);
stress_time->start();
startBackgroundTimer();
{
stop_watch.start();
startReader<PSReader>(options.num_readers, [](std::shared_ptr<PSReader> reader) -> void {
Expand Down
28 changes: 11 additions & 17 deletions dbms/src/Storages/Page/workload/HeavySkewWriteRead.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Storages/Page/workload/PSStressEnv.h>
#include <Storages/Page/workload/PSWorkload.h>

namespace DB::PS::tests
Expand All @@ -38,8 +39,8 @@ class HeavySkewWriteRead : public StressWorkload
String desc() override
{
return fmt::format("Some of options will be ignored"
"`paths` will only used first one. which is {}. Data will store in {} ."
"Please cleanup folder after this test."
"`paths` will only used first one. which is {}. Data will store in {}. "
"Please cleanup folder after this test. "
"The current workload will elapse near 60 seconds",
options.paths[0],
options.paths[0] + "/" + name());
Expand All @@ -51,28 +52,21 @@ class HeavySkewWriteRead : public StressWorkload
DB::PageStorageConfig config;
initPageStorage(config, name());

metrics_dumper = std::make_shared<PSMetricsDumper>(1);
metrics_dumper->start();
startBackgroundTimer();

stress_time = std::make_shared<StressTimeout>(60);
stress_time->start();
{
stop_watch.start();
startWriter<PSWindowWriter>(options.num_writers, [](std::shared_ptr<PSWindowWriter> writer) -> void {
const auto num_writers = options.num_writers;
startWriter<PSWindowWriter>(num_writers, [&](std::shared_ptr<PSWindowWriter> writer) {
writer->setBatchBufferNums(1);
writer->setBatchBufferRange(10 * 1024, 1 * DB::MB);
writer->setWindowSize(500);
writer->setNormalDistributionSigma(13);
writer->setBufferSizeRange(0, options.avg_page_size * 2);
writer->setNormalDistributionSigma(250);
});

auto num_writers = options.num_writers;

startReader<PSWindowReader>(options.num_readers, [num_writers](std::shared_ptr<PSWindowReader> reader) -> void {
reader->setPageReadOnce(5);
startReader<PSWindowReader>(options.num_readers, [](std::shared_ptr<PSWindowReader> reader) {
reader->setReadPageNums(5);
reader->setReadDelay(0);
reader->setWriterNums(num_writers);
reader->setWindowSize(100);
reader->setNormalDistributionSigma(9);
reader->setNormalDistributionSigma(250);
});

pool.joinAll();
Expand Down
8 changes: 2 additions & 6 deletions dbms/src/Storages/Page/workload/HeavyWrite.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,12 @@ class HeavyWrite : public StressWorkload
DB::PageStorageConfig config;
initPageStorage(config, name());

metrics_dumper = std::make_shared<PSMetricsDumper>(1);
metrics_dumper->start();

stress_time = std::make_shared<StressTimeout>(60);
stress_time->start();
startBackgroundTimer();
{
stop_watch.start();
startWriter<PSCommonWriter>(options.num_writers, [](std::shared_ptr<PSCommonWriter> writer) -> void {
writer->setBatchBufferNums(4);
writer->setBatchBufferRange(1, 2 * DB::MB);
writer->setBufferSizeRange(1, 2 * DB::MB);
});

pool.joinAll();
Expand Down
10 changes: 5 additions & 5 deletions dbms/src/Storages/Page/workload/HighValidBigFileGC.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class HighValidBigFileGCWorkload

startWriter<PSCommonWriter>(1, [](std::shared_ptr<PSCommonWriter> writer) -> void {
writer->setBatchBufferNums(1);
writer->setBatchBufferSize(100ULL * DB::MB);
writer->setBufferSizeRange(100ULL * DB::MB, 100ULL * DB::MB);
writer->setBatchBufferLimit(8ULL * DB::GB);
writer->setBatchBufferPageRange(1000000);
});
Expand All @@ -86,7 +86,7 @@ class HighValidBigFileGCWorkload
stop_watch.start();
startWriter<PSCommonWriter>(1, [](std::shared_ptr<PSCommonWriter> writer) -> void {
writer->setBatchBufferNums(4);
writer->setBatchBufferSize(2ULL * DB::MB);
writer->setBufferSizeRange(2ULL * DB::MB, 2ULL * DB::MB);
writer->setBatchBufferLimit(1ULL * DB::GB);
writer->setBatchBufferPageRange(1000000);
});
Expand All @@ -96,14 +96,14 @@ class HighValidBigFileGCWorkload
onDumpResult();
}

gc = std::make_shared<PSGc>(ps);
gc = std::make_shared<PSGc>(ps, options.gc_interval_s);
gc->doGcOnce();
gc_time_ms = gc->getElapsedMilliseconds();
{
stop_watch.start();
startWriter<PSCommonWriter>(1, [](std::shared_ptr<PSCommonWriter> writer) -> void {
writer->setBatchBufferNums(4);
writer->setBatchBufferSize(2ULL * DB::MB);
writer->setBufferSizeRange(2ULL * DB::MB, 2ULL * DB::MB);
writer->setBatchBufferLimit(1ULL * DB::GB);
writer->setBatchBufferPageRange(1000000);
});
Expand All @@ -123,7 +123,7 @@ class HighValidBigFileGCWorkload

void onFailed() override
{
LOG_WARNING(StressEnv::logger, fmt::format("GC time is {} , it should not bigger than {} ", gc_time_ms, 1 * 1000));
LOG_WARNING(StressEnv::logger, "GC time is {} , it should not bigger than {} ", gc_time_ms, 1 * 1000);
}

private:
Expand Down
14 changes: 3 additions & 11 deletions dbms/src/Storages/Page/workload/HoldSnapshotsLongTime.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,22 +52,14 @@ class HoldSnapshotsLongTime : public StressWorkload
DB::PageStorageConfig config;
initPageStorage(config, name());

metrics_dumper = std::make_shared<PSMetricsDumper>(1);
metrics_dumper->start();

stress_time = std::make_shared<StressTimeout>(60);
stress_time->start();

scanner = std::make_shared<PSScanner>(ps);
scanner->start();
startBackgroundTimer();

// 90-100 snapshots will be generated.
{
stop_watch.start();
startWriter<PSWindowWriter>(options.num_writers, [](std::shared_ptr<PSWindowWriter> writer) -> void {
writer->setBatchBufferNums(1);
writer->setBatchBufferRange(10 * 1024, 1 * DB::MB);
writer->setWindowSize(500);
writer->setBufferSizeRange(10 * 1024, 1 * DB::MB);
writer->setNormalDistributionSigma(13);
});

Expand All @@ -79,7 +71,7 @@ class HoldSnapshotsLongTime : public StressWorkload
stop_watch.stop();
}

gc = std::make_shared<PSGc>(ps);
gc = std::make_shared<PSGc>(ps, options.gc_interval_s);
// Normal GC
gc->doGcOnce();

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/Page/workload/MainEntry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ int StressWorkload::mainEntry(int argc, char ** argv)
auto env = StressEnv::parse(argc, argv);
env.setup();

auto & mamager = StressWorkloadManger::getInstance();
auto & mamager = PageWorkloadFactory::getInstance();
mamager.setEnv(env);
mamager.runWorkload();

Expand All @@ -55,4 +55,4 @@ int StressWorkload::mainEntry(int argc, char ** argv)
DB::tryLogCurrentException("");
exit(-1);
}
}
}
23 changes: 11 additions & 12 deletions dbms/src/Storages/Page/workload/Normal.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Exception.h>
#include <Storages/Page/workload/PSRunnable.h>
#include <Storages/Page/workload/PSWorkload.h>

namespace DB::PS::tests
Expand Down Expand Up @@ -48,25 +50,22 @@ class NormalWorkload
config.num_write_slots = options.num_writer_slots;
initPageStorage(config);

if (options.avg_page_size_mb != 0)
{
PSWriter::setApproxPageSize(options.avg_page_size_mb);
}

// init all pages in PageStorage
if (options.init_pages || options.just_init_pages)
if (options.init_pages)
{
PSWriter::fillAllPages(ps);
static constexpr PageId MAX_PAGE_ID_DEFAULT = 1000;
initPages(MAX_PAGE_ID_DEFAULT);
LOG_INFO(StressEnv::logger, "All pages have been init.");
if (options.just_init_pages)
{
return;
}
}

RUNTIME_CHECK(options.avg_page_size > 1000);

stop_watch.start();

startWriter<PSWriter>(options.num_writers);
startWriter<PSWriter>(options.num_writers, [this](std::shared_ptr<PSWriter> w) {
// A small random range
w->setBufferSizeRange(options.avg_page_size - 1000 / 2, options.avg_page_size + 1000 / 2);
});
const size_t read_delay_ms = options.read_delay_ms;
startReader<PSReader>(options.num_readers, [read_delay_ms](std::shared_ptr<PSReader> reader) -> void {
reader->setReadDelay(read_delay_ms);
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/Page/workload/PSBackground.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ void PSGc::start()
gc_timer.start(Poco::TimerCallback<PSGc>(*this, &PSGc::onTime));
}

void PSScanner::onTime(Poco::Timer & /*timer*/)
void PSSnapStatGetter::onTime(Poco::Timer & /*timer*/)
{
try
{
Expand All @@ -94,9 +94,9 @@ void PSScanner::onTime(Poco::Timer & /*timer*/)
}
}

void PSScanner::start()
void PSSnapStatGetter::start()
{
scanner_timer.start(Poco::TimerCallback<PSScanner>(*this, &PSScanner::onTime));
scanner_timer.start(Poco::TimerCallback<PSSnapStatGetter>(*this, &PSSnapStatGetter::onTime));
}

// NOLINTNEXTLINE(readability-convert-member-functions-to-static)
Expand Down
Loading