Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -266,16 +266,6 @@ CONF_Bool(enable_base_compaction_idle_sched, "true");
CONF_Bool(enable_dup_key_base_compaction_skip_big_file, "true");
CONF_mInt64(base_compaction_dup_key_max_file_size_mbytes, "1024");

// config the cumulative compaction policy
// Valid configs: num_based, size_based
// num_based policy, the original version of cumulative compaction, cumulative version compaction once.
// size_based policy, a optimization version of cumulative compaction, targeting the use cases requiring
// lower write amplification, trading off read amplification and space amplification.
CONF_mString(cumulative_compaction_policy, "size_based");
CONF_Validator(cumulative_compaction_policy, [](const std::string config) -> bool {
return config == "size_based" || config == "num_based";
});

// In size_based policy, output rowset of cumulative compaction total disk size exceed this config size,
// this rowset will be given to base compaction, unit is m byte.
CONF_mInt64(cumulative_size_based_promotion_size_mbytes, "1024");
Expand Down
23 changes: 2 additions & 21 deletions be/src/http/action/compaction_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,10 @@ Status CompactionAction::_execute_compaction_callback(TabletSharedPtr tablet,
timer.start();

std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy =
_create_cumulative_compaction_policy();
if (tablet->get_cumulative_compaction_policy() == nullptr ||
tablet->get_cumulative_compaction_policy()->name() !=
cumulative_compaction_policy->name()) {
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy();
if (tablet->get_cumulative_compaction_policy() == nullptr) {
tablet->set_cumulative_compaction_policy(cumulative_compaction_policy);
}

Status res = Status::OK();
if (compaction_type == PARAM_COMPACTION_BASE) {
BaseCompaction base_compaction(tablet);
Expand Down Expand Up @@ -257,20 +254,4 @@ void CompactionAction::handle(HttpRequest* req) {
}
}

std::shared_ptr<CumulativeCompactionPolicy>
CompactionAction::_create_cumulative_compaction_policy() {
std::string current_policy;
{
std::lock_guard<std::mutex> lock(*config::get_mutable_string_config_lock());
current_policy = config::cumulative_compaction_policy;
}
boost::to_upper(current_policy);

if (current_policy == CUMULATIVE_SIZE_BASED_POLICY) {
// check size_based cumulative compaction config
StorageEngine::instance()->check_cumulative_compaction_config();
}

return CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(current_policy);
}
} // end namespace doris
2 changes: 0 additions & 2 deletions be/src/http/action/compaction_action.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ class CompactionAction : public HttpHandler {
/// check param and fetch tablet_id from req
Status _check_param(HttpRequest* req, uint64_t* tablet_id);

std::shared_ptr<CumulativeCompactionPolicy> _create_cumulative_compaction_policy();

private:
CompactionActionType _type;
};
Expand Down
142 changes: 2 additions & 140 deletions be/src/olap/cumulative_compaction_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,121 +345,6 @@ int SizeBasedCumulativeCompactionPolicy::_level_size(const int64_t size) {
return 0;
}

void NumBasedCumulativeCompactionPolicy::update_cumulative_point(
Tablet* tablet, const std::vector<RowsetSharedPtr>& input_rowsets,
RowsetSharedPtr _output_rowset, Version& last_delete_version) {
// use the version after end version of the last input rowsets to update cumulative point
int64_t cumulative_point = input_rowsets.back()->end_version() + 1;
tablet->set_cumulative_layer_point(cumulative_point);
}

int NumBasedCumulativeCompactionPolicy::pick_input_rowsets(
Tablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets,
const int64_t max_compaction_score, const int64_t min_compaction_score,
std::vector<RowsetSharedPtr>* input_rowsets, Version* last_delete_version,
size_t* compaction_score) {
*compaction_score = 0;
int transient_size = 0;
for (size_t i = 0; i < candidate_rowsets.size(); ++i) {
RowsetSharedPtr rowset = candidate_rowsets[i];
// check whether this rowset is delete version
if (tablet->version_for_delete_predicate(rowset->version())) {
*last_delete_version = rowset->version();
if (!input_rowsets->empty()) {
// we meet a delete version, and there were other versions before.
// we should compact those version before handling them over to base compaction
break;
} else {
// we meet a delete version, and no other versions before, skip it and continue
input_rowsets->clear();
transient_size = 0;
*compaction_score = 0;
continue;
}
}
if (*compaction_score >= max_compaction_score) {
// got enough segments
break;
}
*compaction_score += rowset->rowset_meta()->get_compaction_score();
input_rowsets->push_back(rowset);
transient_size += 1;
}

if (input_rowsets->empty()) {
return transient_size;
}

// if we have a sufficient number of segments,
// or have other versions before encountering the delete version, we should process the compaction.
if (last_delete_version->first == -1 && *compaction_score < min_compaction_score) {
input_rowsets->clear();
}
return transient_size;
}

void NumBasedCumulativeCompactionPolicy::calc_cumulative_compaction_score(
TabletState state, const std::vector<RowsetMetaSharedPtr>& all_rowsets,
const int64_t current_cumulative_point, uint32_t* score) {
const int64_t point = current_cumulative_point;
for (auto& rs_meta : all_rowsets) {
if (rs_meta->start_version() < point) {
// all_rs_metas() is not sorted, so we use _continue_ other than _break_ here.
continue;
}
*score += rs_meta->get_compaction_score();
}
}

void NumBasedCumulativeCompactionPolicy::calculate_cumulative_point(
Tablet* tablet, const std::vector<RowsetMetaSharedPtr>& all_metas,
int64_t current_cumulative_point, int64_t* ret_cumulative_point) {
*ret_cumulative_point = Tablet::K_INVALID_CUMULATIVE_POINT;
if (current_cumulative_point != Tablet::K_INVALID_CUMULATIVE_POINT) {
// only calculate the point once.
// after that, cumulative point will be updated along with compaction process.
return;
}

std::list<RowsetMetaSharedPtr> existing_rss;
for (auto& rs : all_metas) {
existing_rss.emplace_back(rs);
}

// sort the existing rowsets by version in ascending order
existing_rss.sort([](const RowsetMetaSharedPtr& a, const RowsetMetaSharedPtr& b) {
// simple because 2 versions are certainly not overlapping
return a->version().first < b->version().first;
});

if (tablet->tablet_state() == TABLET_RUNNING) {
int64_t prev_version = -1;
for (const RowsetMetaSharedPtr& rs : existing_rss) {
if (rs->version().first > prev_version + 1) {
// There is a hole, do not continue
break;
}
// break the loop if segments in this rowset is overlapping, or is a singleton.
if (rs->is_segments_overlapping() || rs->is_singleton_delta()) {
*ret_cumulative_point = rs->version().first;
break;
}

prev_version = rs->version().second;
*ret_cumulative_point = prev_version + 1;
}
} else if (tablet->tablet_state() == TABLET_NOTREADY) {
// tablet under alter process
// we choose version next to the base version as cumulative point
for (const RowsetMetaSharedPtr& rs : existing_rss) {
if (rs->version().first > 0) {
*ret_cumulative_point = rs->version().first;
break;
}
}
}
}

void CumulativeCompactionPolicy::pick_candidate_rowsets(
const std::unordered_map<Version, RowsetSharedPtr, HashOfVersion>& rs_version_map,
int64_t cumulative_point, std::vector<RowsetSharedPtr>* candidate_rowsets) {
Expand All @@ -472,31 +357,8 @@ void CumulativeCompactionPolicy::pick_candidate_rowsets(
}

std::shared_ptr<CumulativeCompactionPolicy>
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(std::string type) {
CompactionPolicy policy_type;
_parse_cumulative_compaction_policy(type, &policy_type);

if (policy_type == NUM_BASED_POLICY) {
return std::unique_ptr<CumulativeCompactionPolicy>(
new NumBasedCumulativeCompactionPolicy());
} else if (policy_type == SIZE_BASED_POLICY) {
return std::unique_ptr<CumulativeCompactionPolicy>(
new SizeBasedCumulativeCompactionPolicy());
}

return std::shared_ptr<CumulativeCompactionPolicy>(new NumBasedCumulativeCompactionPolicy());
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy() {
return std::unique_ptr<CumulativeCompactionPolicy>(new SizeBasedCumulativeCompactionPolicy());
}

void CumulativeCompactionPolicyFactory::_parse_cumulative_compaction_policy(
std::string type, CompactionPolicy* policy_type) {
if (type == CUMULATIVE_NUM_BASED_POLICY) {
*policy_type = NUM_BASED_POLICY;
} else if (type == CUMULATIVE_SIZE_BASED_POLICY) {
*policy_type = SIZE_BASED_POLICY;
} else {
LOG(WARNING) << "parse cumulative compaction policy error " << type << ", default use "
<< CUMULATIVE_NUM_BASED_POLICY;
*policy_type = NUM_BASED_POLICY;
}
}
} // namespace doris
65 changes: 2 additions & 63 deletions be/src/olap/cumulative_compaction_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,8 @@ namespace doris {

class Tablet;

/// This CompactionPolicy enum is used to represent the type of compaction policy.
/// Now it has two values, NUM_BASED_POLICY and SIZE_BASED_POLICY.
/// NUM_BASED_POLICY means current compaction policy implemented by num based policy.
/// SIZE_BASED_POLICY means current compaction policy implemented by size_based policy.
enum CompactionPolicy {
NUM_BASED_POLICY = 0,
SIZE_BASED_POLICY = 1,
};

const static std::string CUMULATIVE_NUM_BASED_POLICY = "NUM_BASED";
const static std::string CUMULATIVE_SIZE_BASED_POLICY = "SIZE_BASED";

/// This class CumulativeCompactionPolicy is the base class of cumulative compaction policy.
/// It defines the policy to do cumulative compaction. It has different derived classes, which implements
/// concrete cumulative compaction algorithm. The policy is configured by conf::cumulative_compaction_policy.
Expand Down Expand Up @@ -115,52 +106,6 @@ class CumulativeCompactionPolicy {
virtual std::string name() = 0;
};

/// Num based cumulative compaction policy implemention. Num based policy which derives CumulativeCompactionPolicy is early
/// basic algorithm. This policy uses linear structure to compact rowsets. The cumulative rowsets compact only once and
/// then the output will do base compaction. It can make segments of rowsets in order and compact small rowsets to a bigger one.
class NumBasedCumulativeCompactionPolicy final : public CumulativeCompactionPolicy {
public:
/// Constructor function of NumBasedCumulativeCompactionPolicy,
/// it needs tablet pointer to access tablet method.
/// param tablet, the shared pointer of tablet
NumBasedCumulativeCompactionPolicy() : CumulativeCompactionPolicy() {}

/// Destructor function of NumBasedCumulativeCompactionPolicy.
~NumBasedCumulativeCompactionPolicy() {}

/// Num based cumulative compaction policy implements pick input rowsets function.
/// Its main policy is picking rowsets from candidate rowsets by comparing accumulative compaction_score and
/// max_cumulative_compaction_num_singleton_deltas or checking whether there is delete version rowset.
int pick_input_rowsets(Tablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets,
const int64_t max_compaction_score, const int64_t min_compaction_score,
std::vector<RowsetSharedPtr>* input_rowsets,
Version* last_delete_version, size_t* compaction_score) override;

/// Num based cumulative compaction policy implements update cumulative point function.
/// Its main policy is using the last input version to update the cumulative point. It aims that every rowsets only
/// do compact once.
void update_cumulative_point(Tablet* tablet, const std::vector<RowsetSharedPtr>& input_rowsets,
RowsetSharedPtr _output_rowset,
Version& last_delete_version) override;

/// Num based cumulative compaction policy implements calculate cumulative point function.
/// When the first time the tablet does compact, this calculation is executed. Its main policy is to find first rowset
/// which is segments_overlapping type, it represent this rowset is not compacted and use this version as cumulative point.
void calculate_cumulative_point(Tablet* tablet,
const std::vector<RowsetMetaSharedPtr>& all_rowsets,
int64_t current_cumulative_point,
int64_t* cumulative_point) override;

/// Num based cumulative compaction policy implements calc cumulative compaction score function.
/// Its main policy is calculating the accumulative compaction score after current cumulative_point in tablet.
void calc_cumulative_compaction_score(TabletState state,
const std::vector<RowsetMetaSharedPtr>& all_rowsets,
int64_t current_cumulative_point,
uint32_t* score) override;

std::string name() override { return CUMULATIVE_NUM_BASED_POLICY; }
};

/// SizeBased cumulative compaction policy implemention. SizeBased policy which derives CumulativeCompactionPolicy is a optimized
/// version of num based cumulative compaction policy. This policy also uses linear structure to compact rowsets. The cumulative rowsets
/// can do compaction when they are in same level size. And when output rowset exceeds the promotion radio of base size or min promotion
Expand Down Expand Up @@ -248,13 +193,7 @@ class CumulativeCompactionPolicyFactory {
public:
/// Static factory function. It can product different policy according to the `policy` parameter and use tablet ptr
/// to construct the policy. Now it can product size based and num based policies.
static std::shared_ptr<CumulativeCompactionPolicy> create_cumulative_compaction_policy(
std::string policy);

private:
/// It is a static function to help to check the policy config and convert to CompactionPolicy enum variable
static void _parse_cumulative_compaction_policy(std::string policy,
CompactionPolicy* policy_type);
static std::shared_ptr<CumulativeCompactionPolicy> create_cumulative_compaction_policy();
};

} // namespace doris
21 changes: 3 additions & 18 deletions be/src/olap/olap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,6 @@ void StorageEngine::_compaction_tasks_producer_callback() {
std::vector<TabletSharedPtr> StorageEngine::_generate_compaction_tasks(
CompactionType compaction_type, std::vector<DataDir*>& data_dirs, bool check_score) {
_update_cumulative_compaction_policy();

std::vector<TabletSharedPtr> tablets_compaction;
uint32_t max_compaction_score = 0;

Expand Down Expand Up @@ -563,21 +562,9 @@ std::vector<TabletSharedPtr> StorageEngine::_generate_compaction_tasks(
}

void StorageEngine::_update_cumulative_compaction_policy() {
std::string current_policy = "";
{
std::lock_guard<std::mutex> lock(*config::get_mutable_string_config_lock());
current_policy = config::cumulative_compaction_policy;
}
boost::to_upper(current_policy);
if (_cumulative_compaction_policy == nullptr ||
_cumulative_compaction_policy->name() != current_policy) {
if (current_policy == CUMULATIVE_SIZE_BASED_POLICY) {
// check size_based cumulative compaction config
check_cumulative_compaction_config();
}
if (_cumulative_compaction_policy == nullptr) {
_cumulative_compaction_policy =
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
current_policy);
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy();
}
}

Expand Down Expand Up @@ -673,9 +660,7 @@ Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
Status StorageEngine::submit_compaction_task(TabletSharedPtr tablet,
CompactionType compaction_type) {
_update_cumulative_compaction_policy();
if (tablet->get_cumulative_compaction_policy() == nullptr ||
tablet->get_cumulative_compaction_policy()->name() !=
_cumulative_compaction_policy->name()) {
if (tablet->get_cumulative_compaction_policy() == nullptr) {
tablet->set_cumulative_compaction_policy(_cumulative_compaction_policy);
}
return _submit_compaction_task(tablet, compaction_type);
Expand Down
1 change: 0 additions & 1 deletion be/src/olap/storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,6 @@ class StorageEngine {
std::vector<TabletSharedPtr> _generate_compaction_tasks(CompactionType compaction_type,
std::vector<DataDir*>& data_dirs,
bool check_score);

void _update_cumulative_compaction_policy();

bool _push_tablet_into_submitted_compaction(TabletSharedPtr tablet,
Expand Down
3 changes: 1 addition & 2 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,7 @@ Status Tablet::_init_once_action() {
#ifdef BE_TEST
// init cumulative compaction policy by type
_cumulative_compaction_policy =
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
_cumulative_compaction_type);
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy();
#endif

RowsetVector rowset_vec;
Expand Down
Loading