Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 16 additions & 20 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -731,29 +731,25 @@ void TaskWorkerPool::_publish_version_worker_thread_callback() {
.error(status);
finish_task_request.__set_error_tablet_ids(error_tablet_ids);
} else {
if (config::enable_quick_compaction && config::quick_compaction_batch_size > 0) {
for (int i = 0; i < succ_tablet_ids.size(); i++) {
TabletSharedPtr tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(
succ_tablet_ids[i]);
if (tablet != nullptr) {
tablet->publised_count++;
if (tablet->publised_count % config::quick_compaction_batch_size == 0) {
StorageEngine::instance()->submit_quick_compaction_task(tablet);
LOG(INFO) << "trigger quick compaction succ, tabletid:"
<< succ_tablet_ids[i]
<< ", publised:" << tablet->publised_count;
}
} else {
LOG(WARNING) << "trigger quick compaction failed, tabletid:"
<< succ_tablet_ids[i];
for (int i = 0; i < succ_tablet_ids.size(); i++) {
TabletSharedPtr tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(succ_tablet_ids[i]);
if (tablet != nullptr) {
tablet->publised_count++;
if (tablet->publised_count % 10 == 0) {
StorageEngine::instance()->submit_compaction_task(
tablet, CompactionType::CUMULATIVE_COMPACTION);
LOG(INFO) << "trigger compaction succ, tabletid:" << succ_tablet_ids[i]
<< ", publised:" << tablet->publised_count;
}
} else {
LOG(WARNING) << "trigger compaction failed, tabletid:" << succ_tablet_ids[i];
}
LOG_INFO("successfully publish version")
.tag("signature", agent_task_req.signature)
.tag("transaction_id", publish_version_req.transaction_id)
.tag("tablets_num", succ_tablet_ids.size());
}
LOG_INFO("successfully publish version")
.tag("signature", agent_task_req.signature)
.tag("transaction_id", publish_version_req.transaction_id)
.tag("tablets_num", succ_tablet_ids.size());
}

status.to_thrift(&finish_task_request.task_status);
Expand Down
69 changes: 18 additions & 51 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -253,59 +253,41 @@ CONF_Bool(enable_vectorized_compaction, "true");
// whether enable vectorized schema change/material-view/rollup task.
CONF_Bool(enable_vectorized_alter_table, "true");

// check the configuration of auto compaction in seconds when auto compaction disabled
CONF_mInt32(check_auto_compaction_interval_seconds, "5");
// This config can be set to limit thread number in compaction thread pool.
CONF_mInt32(max_base_compaction_threads, "4");
CONF_mInt32(max_cumu_compaction_threads, "10");

CONF_mInt64(base_compaction_num_cumulative_deltas, "5");
CONF_mDouble(base_cumulative_delta_ratio, "0.3");
CONF_mInt64(base_compaction_interval_seconds_since_last_operation, "86400");
CONF_mInt32(base_compaction_write_mbytes_per_sec, "5");
CONF_Bool(enable_base_compaction_idle_sched, "true");

// dup key not compaction big files
CONF_Bool(enable_dup_key_base_compaction_skip_big_file, "true");
CONF_mInt64(base_compaction_min_rowset_num, "5");
CONF_mDouble(base_compaction_min_data_ratio, "0.3");
CONF_mInt64(base_compaction_dup_key_max_file_size_mbytes, "1024");

// In size_based policy, output rowset of cumulative compaction total disk size exceed this config size,
// output rowset of cumulative compaction total disk size exceed this config size,
// this rowset will be given to base compaction, unit is m byte.
CONF_mInt64(cumulative_size_based_promotion_size_mbytes, "1024");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can clean some useless configs, but we'd better not to change the config name for compatibility consideration? @morningman pls help to review this kind of change.

CONF_mInt64(compaction_promotion_size_mbytes, "1024");

// In size_based policy, output rowset of cumulative compaction total disk size exceed this config ratio of
// output rowset of cumulative compaction total disk size exceed this config ratio of
// base rowset's total disk size, this rowset will be given to base compaction. The value must be between
// 0 and 1.
CONF_mDouble(cumulative_size_based_promotion_ratio, "0.05");
CONF_mDouble(compaction_promotion_ratio, "0.05");

// In size_based policy, the smallest size of rowset promotion. When the rowset is less than this config, this
// the smallest size of rowset promotion. When the rowset is less than this config, this
// rowset will be not given to base compaction. The unit is m byte.
CONF_mInt64(cumulative_size_based_promotion_min_size_mbytes, "64");
CONF_mInt64(compaction_promotion_min_size_mbytes, "64");

// The lower bound size to do cumulative compaction. When total disk size of candidate rowsets is less than
// this size, size_based policy may not do to cumulative compaction. The unit is m byte.
CONF_mInt64(cumulative_size_based_compaction_lower_size_mbytes, "64");
CONF_mInt64(compaction_min_size_mbytes, "64");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is used for cu, we'd better use cumulative_compaction_min_size_mbytes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And the config is strange, if we insert small data each, e.g. 1row, then all rowset would not be compacted until total size reached 64MB, however the version num is limited by 2000?


// cumulative compaction policy: min and max delta file's number
CONF_mInt64(min_cumulative_compaction_num_singleton_deltas, "5");
CONF_mInt64(max_cumulative_compaction_num_singleton_deltas, "1000");

// if compaction of a tablet failed, this tablet should not be chosen to
// compaction until this interval passes.
CONF_mInt64(min_compaction_failure_interval_sec, "5"); // 5 seconds

// This config can be set to limit thread number in compaction thread pool.
CONF_mInt32(max_base_compaction_threads, "4");
CONF_mInt32(max_cumu_compaction_threads, "10");

// This config can be set to limit thread number in smallcompaction thread pool.
CONF_mInt32(quick_compaction_max_threads, "10");

// Thread count to do tablet meta checkpoint, -1 means use the data directories count.
CONF_Int32(max_meta_checkpoint_threads, "-1");
CONF_mInt64(cumulative_compaction_min_deltas, "5");
CONF_mInt64(cumulative_compaction_max_deltas, "1000");

Comment on lines +284 to 285
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rename deltas to segments?

// The upper limit of "permits" held by all compaction tasks. This config can be set to limit memory consumption for compaction.
CONF_mInt64(total_permits_for_compaction_score, "10000");

// sleep interval in ms after generated compaction tasks
CONF_mInt32(generate_compaction_tasks_min_interval_ms, "10");
CONF_mInt32(generate_compaction_tasks_interval_ms, "10");

// Compaction task number per disk.
// Must be greater than 2, because Base compaction and Cumulative compaction have at least one thread each.
Expand All @@ -319,23 +301,17 @@ CONF_Validator(compaction_task_num_per_fast_disk,
// How many rounds of cumulative compaction for each round of base compaction when compaction tasks generation.
CONF_mInt32(cumulative_compaction_rounds_for_each_base_compaction_round, "9");

// Merge log will be printed for each "row_step_for_compaction_merge_log" rows merged during compaction
CONF_mInt64(row_step_for_compaction_merge_log, "0");

// Threshold to logging compaction trace, in seconds.
CONF_mInt32(base_compaction_trace_threshold, "60");
CONF_mInt32(cumulative_compaction_trace_threshold, "10");
CONF_mBool(disable_compaction_trace_log, "true");

// Thread count to do tablet meta checkpoint, -1 means use the data directories count.
CONF_Int32(max_meta_checkpoint_threads, "-1");

// Threshold to logging agent task trace, in seconds.
CONF_mInt32(agent_task_trace_threshold_sec, "2");

// time interval to record tablet scan count in second for the purpose of calculating tablet scan frequency
CONF_mInt64(tablet_scan_frequency_time_node_interval_second, "300");
// coefficient for tablet scan frequency and compaction score when finding a tablet for compaction
CONF_mInt32(compaction_tablet_scan_frequency_factor, "0");
CONF_mInt32(compaction_tablet_compaction_score_factor, "1");

// This config can be set to limit thread number in tablet migration thread pool.
CONF_Int32(min_tablet_migration_threads, "1");
CONF_Int32(max_tablet_migration_threads, "1");
Expand Down Expand Up @@ -814,15 +790,6 @@ CONF_mInt32(bloom_filter_predicate_check_row_num, "20480");

CONF_Bool(enable_decimalv3, "false");

//whether turn on quick compaction feature
CONF_Bool(enable_quick_compaction, "false");
// For continuous versions that rows less than quick_compaction_max_rows will trigger compaction quickly
CONF_Int32(quick_compaction_max_rows, "1000");
// min compaction versions
CONF_Int32(quick_compaction_batch_size, "10");
// do compaction min rowsets
CONF_Int32(quick_compaction_min_rowsets, "10");

// cooldown task configs
CONF_Int32(cooldown_thread_num, "5");
CONF_mInt64(generate_cooldown_task_interval_sec, "20");
Expand Down
17 changes: 8 additions & 9 deletions be/src/olap/base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,9 @@ Status BaseCompaction::execute_compact_impl() {
}

void BaseCompaction::_filter_input_rowset() {
// if enable dup key skip big file and no delete predicate
// if dup_key and no delete predicate
// we skip big files too save resources
if (!config::enable_dup_key_base_compaction_skip_big_file ||
_tablet->keys_type() != KeysType::DUP_KEYS || _tablet->delete_predicates().size() != 0) {
if (_tablet->keys_type() != KeysType::DUP_KEYS || _tablet->delete_predicates().size() != 0) {
return;
}
int64_t max_size = config::base_compaction_dup_key_max_file_size_mbytes * 1024 * 1024;
Expand Down Expand Up @@ -144,11 +143,11 @@ Status BaseCompaction::pick_rowsets_to_compact() {
}

// 1. cumulative rowset must reach base_compaction_num_cumulative_deltas threshold
if (_input_rowsets.size() > config::base_compaction_num_cumulative_deltas) {
if (_input_rowsets.size() > config::base_compaction_min_rowset_num) {
VLOG_NOTICE << "satisfy the base compaction policy. tablet=" << _tablet->full_name()
<< ", num_cumulative_rowsets=" << _input_rowsets.size() - 1
<< ", base_compaction_num_cumulative_rowsets="
<< config::base_compaction_num_cumulative_deltas;
<< config::base_compaction_min_rowset_num;
return Status::OK();
}

Expand All @@ -160,26 +159,26 @@ Status BaseCompaction::pick_rowsets_to_compact() {
cumulative_total_size += (*it)->data_disk_size();
}

double base_cumulative_delta_ratio = config::base_cumulative_delta_ratio;
double min_data_ratio = config::base_compaction_min_data_ratio;
if (base_size == 0) {
// base_size == 0 means this may be a base version [0-1], which has no data.
// set to 1 to void divide by zero
base_size = 1;
}
double cumulative_base_ratio = static_cast<double>(cumulative_total_size) / base_size;

if (cumulative_base_ratio > base_cumulative_delta_ratio) {
if (cumulative_base_ratio > min_data_ratio) {
VLOG_NOTICE << "satisfy the base compaction policy. tablet=" << _tablet->full_name()
<< ", cumulative_total_size=" << cumulative_total_size
<< ", base_size=" << base_size
<< ", cumulative_base_ratio=" << cumulative_base_ratio
<< ", policy_ratio=" << base_cumulative_delta_ratio;
<< ", policy_min_data_ratio=" << min_data_ratio;
return Status::OK();
}

// 3. the interval since last base compaction reaches the threshold
int64_t base_creation_time = _input_rowsets[0]->creation_time();
int64_t interval_threshold = config::base_compaction_interval_seconds_since_last_operation;
int64_t interval_threshold = 86400;
int64_t interval_since_last_base_compaction = time(nullptr) - base_creation_time;
if (interval_since_last_base_compaction > interval_threshold) {
VLOG_NOTICE << "satisfy the base compaction policy. tablet=" << _tablet->full_name()
Expand Down
55 changes: 2 additions & 53 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,57 +59,6 @@ Status Compaction::execute_compact() {
return st;
}

Status Compaction::quick_rowsets_compact() {
std::unique_lock<std::mutex> lock(_tablet->get_cumulative_compaction_lock(), std::try_to_lock);
if (!lock.owns_lock()) {
LOG(WARNING) << "The tablet is under cumulative compaction. tablet="
<< _tablet->full_name();
return Status::OLAPInternalError(OLAP_ERR_CE_TRY_CE_LOCK_ERROR);
}

// Clone task may happen after compaction task is submitted to thread pool, and rowsets picked
// for compaction may change. In this case, current compaction task should not be executed.
if (_tablet->get_clone_occurred()) {
_tablet->set_clone_occurred(false);
return Status::OLAPInternalError(OLAP_ERR_CUMULATIVE_CLONE_OCCURRED);
}

_input_rowsets.clear();
int version_count = _tablet->version_count();
MonotonicStopWatch watch;
watch.start();
int64_t permits = 0;
_tablet->pick_quick_compaction_rowsets(&_input_rowsets, &permits);
std::vector<Version> missedVersions;
find_longest_consecutive_version(&_input_rowsets, &missedVersions);
if (missedVersions.size() != 0) {
LOG(WARNING) << "quick_rowsets_compaction, find missed version"
<< ",input_size:" << _input_rowsets.size();
}
int nums = _input_rowsets.size();
if (_input_rowsets.size() >= config::quick_compaction_min_rowsets) {
Status st = check_version_continuity(_input_rowsets);
if (!st.ok()) {
LOG(WARNING) << "quick_rowsets_compaction failed, cause version not continuous";
return st;
}
st = do_compaction(permits);
if (!st.ok()) {
gc_output_rowset();
LOG(WARNING) << "quick_rowsets_compaction failed";
} else {
LOG(INFO) << "quick_compaction succ"
<< ", before_versions:" << version_count
<< ", after_versions:" << _tablet->version_count()
<< ", cost:" << (watch.elapsed_time() / 1000 / 1000) << "ms"
<< ", merged: " << nums << ", batch:" << config::quick_compaction_batch_size
<< ", segments:" << permits << ", tabletid:" << _tablet->tablet_id();
_tablet->set_last_quick_compaction_success_time(UnixMillis());
}
}
return Status::OK();
}

Status Compaction::do_compaction(int64_t permits) {
TRACE("start to do compaction");
_tablet->data_dir()->disks_compaction_score_increment(permits);
Expand Down Expand Up @@ -223,15 +172,15 @@ Status Compaction::do_compaction_impl(int64_t permits) {
}

auto cumu_policy = _tablet->cumulative_compaction_policy();
DCHECK(cumu_policy);
LOG(INFO) << "succeed to do " << merge_type << compaction_name()
<< ". tablet=" << _tablet->full_name() << ", output_version=" << _output_version
<< ", current_max_version=" << current_max_version
<< ", disk=" << _tablet->data_dir()->path() << ", segments=" << segments_num
<< ", input_row_num=" << _input_row_num
<< ", output_row_num=" << _output_rowset->num_rows()
<< ". elapsed time=" << watch.get_elapse_second()
<< "s. cumulative_compaction_policy="
<< (cumu_policy == nullptr ? "quick" : cumu_policy->name())
<< "s. cumulative_compaction_policy=" << cumu_policy->name()
<< ", compact_row_per_second=" << int(_input_row_num / watch.get_elapse_second());

return Status::OK();
Expand Down
1 change: 0 additions & 1 deletion be/src/olap/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ class Compaction {

// This is only for http CompactionAction
Status compact();
Status quick_rowsets_compact();

virtual Status prepare_compact() = 0;
Status execute_compact();
Expand Down
10 changes: 4 additions & 6 deletions be/src/olap/cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,9 @@ Status CumulativeCompaction::pick_rowsets_to_compact() {

size_t compaction_score = 0;
int transient_size = _tablet->cumulative_compaction_policy()->pick_input_rowsets(
_tablet.get(), candidate_rowsets,
config::max_cumulative_compaction_num_singleton_deltas,
config::min_cumulative_compaction_num_singleton_deltas, &_input_rowsets,
&_last_delete_version, &compaction_score);
_tablet.get(), candidate_rowsets, config::cumulative_compaction_max_deltas,
config::cumulative_compaction_min_deltas, &_input_rowsets, &_last_delete_version,
&compaction_score);

// Cumulative compaction will process with at least 1 rowset.
// So when there is no rowset being chosen, we should return Status::OLAPInternalError(OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSION):
Expand All @@ -143,8 +142,7 @@ Status CumulativeCompaction::pick_rowsets_to_compact() {
int64_t last_cumu = _tablet->last_cumu_compaction_success_time();
int64_t last_base = _tablet->last_base_compaction_success_time();
if (last_cumu != 0 || last_base != 0) {
int64_t interval_threshold =
config::base_compaction_interval_seconds_since_last_operation * 1000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should define a const instead of using 86400 in different places.

int64_t interval_threshold = 86400 * 1000;
int64_t cumu_interval = now - last_cumu;
int64_t base_interval = now - last_base;
if (cumu_interval > interval_threshold && base_interval > interval_threshold) {
Expand Down
Loading