Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
1089cda
compaction memory optimize and code refactor
weizuo93 Sep 18, 2020
d96e0ee
compaction memory optimize and code refactor
weizuo93 Sep 20, 2020
908788a
compaction memory optimize and code refactor
weizuo93 Sep 20, 2020
4bea5d5
compaction memory optimize and code refactor
weizuo93 Sep 20, 2020
72e4e79
compaction memory optimize and code refactor
weizuo93 Sep 20, 2020
506afdb
add compaction status for tablet
weizuo93 Sep 21, 2020
dce66cd
add lambda function for submit callback
weizuo93 Sep 21, 2020
ab4722a
code refactor
weizuo93 Sep 21, 2020
71cffec
code refactor
weizuo93 Sep 21, 2020
6468197
code refactor
weizuo93 Sep 21, 2020
5f784bf
code refactor
weizuo93 Sep 21, 2020
e9406e7
add metrics and comment for compaction limiter
weizuo93 Sep 22, 2020
f7b614c
modify metrics
weizuo93 Sep 22, 2020
fbe9b57
modify metrics
weizuo93 Sep 22, 2020
7200398
modify metrics
weizuo93 Sep 23, 2020
8652b55
modify metrics
weizuo93 Sep 23, 2020
644b126
modify config
weizuo93 Sep 23, 2020
6ce8595
modify config
weizuo93 Sep 23, 2020
b7dd382
modify metrics
weizuo93 Sep 24, 2020
256d258
modify metrics
weizuo93 Sep 24, 2020
9484f52
modify metrics
weizuo93 Sep 24, 2020
18c2933
modify metrics
weizuo93 Sep 25, 2020
5c6879b
optimize compaction tasks producer
weizuo93 Sep 27, 2020
7c343f4
modify compaction tasks producer
weizuo93 Sep 28, 2020
3e49f53
modify compaction tasks producer
weizuo93 Sep 28, 2020
ab5f6b3
optimize compaction memory and code refactor
weizuo93 Sep 28, 2020
a0206b3
compaction log modification
weizuo93 Sep 30, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 15 additions & 11 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -264,12 +264,12 @@ namespace config {
// be policy
// whether disable automatic compaction task
CONF_mBool(disable_auto_compaction, "false");
// check the configuration of auto compaction in seconds when auto compaction disabled
CONF_mInt32(check_auto_compaction_interval_seconds, "5");

// CONF_Int64(base_compaction_start_hour, "20");
// CONF_Int64(base_compaction_end_hour, "7");
CONF_mInt32(base_compaction_check_interval_seconds, "60");
CONF_mInt64(base_compaction_num_cumulative_deltas, "5");
CONF_Int32(base_compaction_num_threads_per_disk, "1");
CONF_mDouble(base_cumulative_delta_ratio, "0.3");
CONF_mInt64(base_compaction_interval_seconds_since_last_operation, "86400");
CONF_mInt32(base_compaction_write_mbytes_per_sec, "5");
Expand All @@ -296,10 +296,8 @@ namespace config {
CONF_mInt64(cumulative_size_based_compaction_lower_size_mbytes, "64");

// cumulative compaction policy: max delta file's size unit:B
CONF_mInt32(cumulative_compaction_check_interval_seconds, "10");
CONF_mInt64(min_cumulative_compaction_num_singleton_deltas, "5");
CONF_mInt64(max_cumulative_compaction_num_singleton_deltas, "1000");
CONF_Int32(cumulative_compaction_num_threads_per_disk, "1");
CONF_mInt64(cumulative_compaction_budgeted_bytes, "104857600");
// CONF_Int32(cumulative_compaction_write_mbytes_per_sec, "100");
// cumulative compaction skips recently published deltas in order to prevent
Expand All @@ -310,13 +308,19 @@ namespace config {
// if compaction of a tablet failed, this tablet should not be chosen to
// compaction until this interval passes.
CONF_mInt64(min_compaction_failure_interval_sec, "600"); // 10 min
// Too many compaction tasks may run out of memory.
// This config is to limit the max concurrency of running compaction tasks.
// -1 means no limit, and the max concurrency will be:
// C = (cumulative_compaction_num_threads_per_disk + base_compaction_num_threads_per_disk) * dir_num
// set it to larger than C will be set to equal to C.
// This config can be set to 0, which means to forbid any compaction, for some special cases.
CONF_Int32(max_compaction_concurrency, "-1");

// This config can be set to limit thread number in compaction thread pool.
CONF_mInt32(min_compaction_threads, "10");
CONF_mInt32(max_compaction_threads, "10");

// The upper limit of "permits" held by all compaction tasks. This config can be set to limit memory consumption for compaction.
CONF_mInt64(total_permits_for_compaction_score, "10000");

// Compaction task number per disk.
CONF_mInt32(compaction_task_num_per_disk, "2");

// How many rounds of cumulative compaction for each round of base compaction when compaction tasks generation.
CONF_mInt32(cumulative_compaction_rounds_for_each_base_compaction_round, "9");

// Threshold to logging compaction trace, in seconds.
CONF_mInt32(base_compaction_trace_threshold, "10");
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ add_library(Olap STATIC
bloom_filter_writer.cpp
byte_buffer.cpp
compaction.cpp
compaction_permit_limiter.cpp
comparison_predicate.cpp
compress.cpp
cumulative_compaction.cpp
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ OLAPStatus BaseCompaction::compact() {
TRACE_COUNTER_INCREMENT("input_rowsets_count", _input_rowsets.size());

// 2. do base compaction, merge rowsets
RETURN_NOT_OK(do_compaction());
int64_t permits = _tablet->calc_base_compaction_score();
RETURN_NOT_OK(do_compaction(permits));
TRACE("compaction finished");

// 3. set state to success
Expand Down
24 changes: 10 additions & 14 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ using std::vector;

namespace doris {

Semaphore Compaction::_concurrency_sem;

Compaction::Compaction(TabletSharedPtr tablet, const std::string& label, const std::shared_ptr<MemTracker>& parent_tracker)
: _mem_tracker(MemTracker::CreateTracker(-1, label, parent_tracker)),
_readers_tracker(MemTracker::CreateTracker(-1, "readers tracker", _mem_tracker)),
Expand All @@ -37,20 +35,17 @@ Compaction::Compaction(TabletSharedPtr tablet, const std::string& label, const s

Compaction::~Compaction() {}

OLAPStatus Compaction::init(int concurreny) {
_concurrency_sem.set_count(concurreny);
return OLAP_SUCCESS;
}

OLAPStatus Compaction::do_compaction() {
_concurrency_sem.wait();
TRACE("got concurrency lock and start to do compaction");
OLAPStatus st = do_compaction_impl();
_concurrency_sem.signal();
OLAPStatus Compaction::do_compaction(int64_t permits) {
TRACE("start to do compaction");
_tablet->data_dir()->disks_compaction_score_increment(permits);
_tablet->data_dir()->disks_compaction_num_increment(1);
OLAPStatus st = do_compaction_impl(permits);
_tablet->data_dir()->disks_compaction_score_increment(-permits);
_tablet->data_dir()->disks_compaction_num_increment(-1);
return st;
}

OLAPStatus Compaction::do_compaction_impl() {
OLAPStatus Compaction::do_compaction_impl(int64_t permits) {
OlapStopWatch watch;

// 1. prepare input and output parameters
Expand All @@ -68,7 +63,8 @@ OLAPStatus Compaction::do_compaction_impl() {
_tablet->compute_version_hash_from_rowsets(_input_rowsets, &_output_version_hash);

LOG(INFO) << "start " << compaction_name() << ". tablet=" << _tablet->full_name()
<< ", output version is=" << _output_version.first << "-" << _output_version.second;
<< ", output version is=" << _output_version.first << "-" << _output_version.second
<< ", score: " << permits;

RETURN_NOT_OK(construct_output_rowset_writer());
RETURN_NOT_OK(construct_input_rowset_readers());
Expand Down
9 changes: 2 additions & 7 deletions be/src/olap/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,13 @@ class Compaction {

virtual OLAPStatus compact() = 0;

static OLAPStatus init(int concurreny);

protected:
virtual OLAPStatus pick_rowsets_to_compact() = 0;
virtual std::string compaction_name() const = 0;
virtual ReaderType compaction_type() const = 0;

OLAPStatus do_compaction();
OLAPStatus do_compaction_impl();
OLAPStatus do_compaction(int64_t permits);
OLAPStatus do_compaction_impl(int64_t permits);

void modify_rowsets();
OLAPStatus gc_unused_rowsets();
Expand All @@ -68,9 +66,6 @@ class Compaction {
OLAPStatus check_version_continuity(const std::vector<RowsetSharedPtr>& rowsets);
OLAPStatus check_correctness(const Merger::Statistics& stats);

// semaphore used to limit the concurrency of running compaction tasks
static Semaphore _concurrency_sem;

private:
// get num rows from segment group meta of input rowsets.
// return -1 if these are not alpha rowsets.
Expand Down
50 changes: 50 additions & 0 deletions be/src/olap/compaction_permit_limiter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "olap/compaction_permit_limiter.h"

namespace doris {

CompactionPermitLimiter::CompactionPermitLimiter() : _used_permits(0) {}

bool CompactionPermitLimiter::request(int64_t permits) {
if (permits > config::total_permits_for_compaction_score) {
// when tablet's compaction score is larger than "config::total_permits_for_compaction_score",
// it's necessary to do compaction for this tablet because this tablet will not get "permits"
// anyway. otherwise, compaction task for this tablet will not be executed forever.
std::unique_lock<std::mutex> lock(_permits_mutex);
_permits_cv.wait(lock, [=] {
return _used_permits == 0 ||
_used_permits + permits <= config::total_permits_for_compaction_score;
});
} else {
if (_used_permits + permits > config::total_permits_for_compaction_score) {
std::unique_lock<std::mutex> lock(_permits_mutex);
_permits_cv.wait(lock, [=] {
return _used_permits + permits <= config::total_permits_for_compaction_score;
});
}
}
_used_permits += permits;
return true;
}

void CompactionPermitLimiter::release(int64_t permits) {
_used_permits -= permits;
_permits_cv.notify_one();
}
} // namespace doris
49 changes: 49 additions & 0 deletions be/src/olap/compaction_permit_limiter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <condition_variable>
#include <mutex>

#include "common/config.h"
#include "olap/utils.h"

namespace doris {

/*
This class is used to control compaction permission. To some extent, it can be used to control memory consumption.
"permits" should be applied before a compaction task can execute. When the sum of "permites" held by executing
compaction tasks reaches a threshold, subsequent compaction task will be no longer allowed, until some "permits"
are released by some finished compaction tasks. "compaction score" for tablet is used as "permits" here.
*/
class CompactionPermitLimiter {
public:
CompactionPermitLimiter();
virtual ~CompactionPermitLimiter() {}

bool request(int64_t permits);

void release(int64_t permits);

private:
// sum of "permits" held by executing compaction tasks currently
AtomicInt64 _used_permits;
std::mutex _permits_mutex;
std::condition_variable _permits_cv;
};
} // namespace doris
3 changes: 2 additions & 1 deletion be/src/olap/cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ OLAPStatus CumulativeCompaction::compact() {
TRACE_COUNTER_INCREMENT("input_rowsets_count", _input_rowsets.size());

// 3. do cumulative compaction, merge rowsets
RETURN_NOT_OK(do_compaction());
int64_t permits = _tablet->calc_cumulative_compaction_score();
RETURN_NOT_OK(do_compaction(permits));
TRACE("compaction finished");

// 4. set state to success
Expand Down
14 changes: 13 additions & 1 deletion be/src/olap/data_dir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(disks_total_capacity, MetricUnit::BYTES);
DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(disks_avail_capacity, MetricUnit::BYTES);
DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(disks_data_used_capacity, MetricUnit::BYTES);
DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(disks_state, MetricUnit::BYTES);
DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(disks_compaction_score, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(disks_compaction_num, MetricUnit::NOUNIT);

static const char* const kMtabPath = "/etc/mtab";
static const char* const kTestFilePath = "/.testfile";
Expand All @@ -83,6 +85,8 @@ DataDir::DataDir(const std::string& path, int64_t capacity_bytes,
INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_avail_capacity);
INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_data_used_capacity);
INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_state);
INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_compaction_score);
INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_compaction_num);
}

DataDir::~DataDir() {
Expand Down Expand Up @@ -986,7 +990,7 @@ void DataDir::update_user_data_size(int64_t size) {
bool DataDir::reach_capacity_limit(int64_t incoming_data_size) {
double used_pct = (_disk_capacity_bytes - _available_bytes + incoming_data_size) /
(double)_disk_capacity_bytes;
int64_t left_bytes = _disk_capacity_bytes - _available_bytes - incoming_data_size;
int64_t left_bytes = _available_bytes - incoming_data_size;

if (used_pct >= config::storage_flood_stage_usage_percent / 100.0 &&
left_bytes <= config::storage_flood_stage_left_capacity_bytes) {
Expand All @@ -996,4 +1000,12 @@ bool DataDir::reach_capacity_limit(int64_t incoming_data_size) {
}
return false;
}

void DataDir::disks_compaction_score_increment(int64_t delta) {
disks_compaction_score->increment(delta);
}

void DataDir::disks_compaction_num_increment(int64_t delta) {
disks_compaction_num->increment(delta);
}
} // namespace doris
6 changes: 6 additions & 0 deletions be/src/olap/data_dir.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ class DataDir {

std::set<TabletInfo> tablet_set() { return _tablet_set; }

void disks_compaction_score_increment(int64_t delta);

void disks_compaction_num_increment(int64_t delta);

private:
std::string _cluster_id_path() const { return _path + CLUSTER_ID_PREFIX; }
Status _init_cluster_id();
Expand Down Expand Up @@ -201,6 +205,8 @@ class DataDir {
IntGauge* disks_avail_capacity;
IntGauge* disks_data_used_capacity;
IntGauge* disks_state;
IntGauge* disks_compaction_score;
IntGauge* disks_compaction_num;
};

} // namespace doris
Loading