Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion be/src/olap/base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ BaseCompaction::BaseCompaction(TabletSharedPtr tablet, const std::string& label,
BaseCompaction::~BaseCompaction() {}

OLAPStatus BaseCompaction::compact() {
RETURN_NOT_OK(prepare_compact());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who will call BaseCompaction::compact() now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who will call BaseCompaction::compact() now?

BaseCompaction::compact() will still be called in "be/src/http/action/compaction_action.cpp" for /api/compaction/run.

RETURN_NOT_OK(execute_compact());

return OLAP_SUCCESS;
}

OLAPStatus BaseCompaction::prepare_compact() {
if (!_tablet->init_succeeded()) {
return OLAP_ERR_INPUT_PARAMETER_ERROR;
}
Expand All @@ -44,9 +51,28 @@ OLAPStatus BaseCompaction::compact() {
RETURN_NOT_OK(pick_rowsets_to_compact());
TRACE("rowsets picked");
TRACE_COUNTER_INCREMENT("input_rowsets_count", _input_rowsets.size());
_tablet->set_clone_occurred(false);

return OLAP_SUCCESS;
}

OLAPStatus BaseCompaction::execute_compact() {
MutexLock lock(_tablet->get_base_lock(), TRY_LOCK);
if (!lock.own_lock()) {
LOG(WARNING) << "another base compaction is running. tablet=" << _tablet->full_name();
return OLAP_ERR_BE_TRY_BE_LOCK_ERROR;
}
TRACE("got base compaction lock");

// Clone task may happen after compaction task is submitted to thread pool, and rowsets picked
// for compaction may change. In this case, current compaction task should not be executed.
if (_tablet->get_clone_occurred()) {
_tablet->set_clone_occurred(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comment to explain why setting clone occurred here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK.

return OLAP_ERR_BE_CLONE_OCCURRED;
}

// 2. do base compaction, merge rowsets
int64_t permits = _tablet->calc_compaction_score(CompactionType::BASE_COMPACTION);
int64_t permits = get_compaction_permits();
RETURN_NOT_OK(do_compaction(permits));
TRACE("compaction finished");

Expand Down
5 changes: 5 additions & 0 deletions be/src/olap/base_compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ class BaseCompaction : public Compaction {

OLAPStatus compact() override;

OLAPStatus prepare_compact() override;
OLAPStatus execute_compact() override;

std::vector<RowsetSharedPtr> get_input_rowsets() { return _input_rowsets; }

protected:
OLAPStatus pick_rowsets_to_compact() override;
std::string compaction_name() const override { return "base compaction"; }
Expand Down
14 changes: 11 additions & 3 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ OLAPStatus Compaction::do_compaction_impl(int64_t permits) {
_tablet->compute_version_hash_from_rowsets(_input_rowsets, &_output_version_hash);

LOG(INFO) << "start " << compaction_name() << ". tablet=" << _tablet->full_name()
<< ", output version is=" << _output_version.first << "-" << _output_version.second
<< ", score: " << permits;
<< ", output_version=" << _output_version.first << "-" << _output_version.second
<< ", permits: " << permits;

RETURN_NOT_OK(construct_output_rowset_writer());
RETURN_NOT_OK(construct_input_rowset_readers());
Expand Down Expand Up @@ -268,4 +268,12 @@ int64_t Compaction::_get_input_num_rows_from_seg_grps() {
return num_rows;
}

} // namespace doris
int64_t Compaction::get_compaction_permits() {
int64_t permits = 0;
for (auto rowset : _input_rowsets) {
permits += rowset->rowset_meta()->get_compaction_score();
}
return permits;
}

} // namespace doris
4 changes: 4 additions & 0 deletions be/src/olap/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ class Compaction {

virtual OLAPStatus compact() = 0;

virtual OLAPStatus prepare_compact() = 0;
virtual OLAPStatus execute_compact() = 0;

protected:
virtual OLAPStatus pick_rowsets_to_compact() = 0;
virtual std::string compaction_name() const = 0;
Expand All @@ -68,6 +71,7 @@ class Compaction {
OLAPStatus check_correctness(const Merger::Statistics& stats);
OLAPStatus find_longest_consecutive_version(std::vector<RowsetSharedPtr>* rowsets,
std::vector<Version>* missing_version);
int64_t get_compaction_permits();

private:
// get num rows from segment group meta of input rowsets.
Expand Down
28 changes: 27 additions & 1 deletion be/src/olap/cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ CumulativeCompaction::CumulativeCompaction(TabletSharedPtr tablet, const std::st
CumulativeCompaction::~CumulativeCompaction() {}

OLAPStatus CumulativeCompaction::compact() {
RETURN_NOT_OK(prepare_compact());
RETURN_NOT_OK(execute_compact());

return OLAP_SUCCESS;
}

OLAPStatus CumulativeCompaction::prepare_compact() {
if (!_tablet->init_succeeded()) {
return OLAP_ERR_CUMULATIVE_INVALID_PARAMETERS;
}
Expand All @@ -52,9 +59,28 @@ OLAPStatus CumulativeCompaction::compact() {
RETURN_NOT_OK(pick_rowsets_to_compact());
TRACE("rowsets picked");
TRACE_COUNTER_INCREMENT("input_rowsets_count", _input_rowsets.size());
_tablet->set_clone_occurred(false);

return OLAP_SUCCESS;
}

OLAPStatus CumulativeCompaction::execute_compact() {
MutexLock lock(_tablet->get_cumulative_lock(), TRY_LOCK);
if (!lock.own_lock()) {
LOG(INFO) << "The tablet is under cumulative compaction. tablet=" << _tablet->full_name();
return OLAP_ERR_CE_TRY_CE_LOCK_ERROR;
}
TRACE("got cumulative compaction lock");

// Clone task may happen after compaction task is submitted to thread pool, and rowsets picked
// for compaction may change. In this case, current compaction task should not be executed.
if (_tablet->get_clone_occurred()) {
_tablet->set_clone_occurred(false);
return OLAP_ERR_CUMULATIVE_CLONE_OCCURRED;
}

// 3. do cumulative compaction, merge rowsets
int64_t permits = _tablet->calc_compaction_score(CompactionType::CUMULATIVE_COMPACTION);
int64_t permits = get_compaction_permits();
RETURN_NOT_OK(do_compaction(permits));
TRACE("compaction finished");

Expand Down
5 changes: 5 additions & 0 deletions be/src/olap/cumulative_compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ class CumulativeCompaction : public Compaction {

OLAPStatus compact() override;

OLAPStatus prepare_compact() override;
OLAPStatus execute_compact() override;

std::vector<RowsetSharedPtr> get_input_rowsets() { return _input_rowsets; }

protected:
OLAPStatus pick_rowsets_to_compact() override;

Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/olap_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ enum OLAPStatus {
OLAP_ERR_BE_INVALID_NEED_MERGED_VERSIONS = -810,
OLAP_ERR_BE_ERROR_DELETE_ACTION = -811,
OLAP_ERR_BE_SEGMENTS_OVERLAPPING = -812,
OLAP_ERR_BE_CLONE_OCCURRED = -813,

// PUSH
// [-900, -1000)
Expand Down Expand Up @@ -339,6 +340,7 @@ enum OLAPStatus {
OLAP_ERR_CUMULATIVE_INVALID_NEED_MERGED_VERSIONS = -2004,
OLAP_ERR_CUMULATIVE_ERROR_DELETE_ACTION = -2005,
OLAP_ERR_CUMULATIVE_MISS_VERSION = -2006,
OLAP_ERR_CUMULATIVE_CLONE_OCCURRED = -2007,

// OLAPMeta
// [-3000, -3100)
Expand Down
57 changes: 22 additions & 35 deletions be/src/olap/olap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,48 +345,35 @@ void StorageEngine::_compaction_tasks_producer_callback() {
continue;
}
for (const auto& tablet : tablets_compaction) {
int64_t permits = tablet->calc_compaction_score(compaction_type);
int64_t permits = tablet->prepare_compaction_and_calculate_permits(compaction_type, tablet);
if (permits == 0) {
continue;
}
if (_permit_limiter.request(permits)) {
{
// Push to _tablet_submitted_compaction before submitting task
std::unique_lock<std::mutex> lock(_tablet_submitted_compaction_mutex);
_tablet_submitted_compaction[tablet->data_dir()].emplace_back(
tablet->tablet_id());
}
if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) {
_compaction_thread_pool->submit_func([=]() {
CgroupsMgr::apply_system_cgroup();
_perform_cumulative_compaction(tablet);
_permit_limiter.release(permits);
std::unique_lock<std::mutex> lock(_tablet_submitted_compaction_mutex);
std::vector<TTabletId>::iterator it_tablet =
find(_tablet_submitted_compaction[tablet->data_dir()].begin(),
_tablet_submitted_compaction[tablet->data_dir()].end(),
tablet->tablet_id());
if (it_tablet !=
_tablet_submitted_compaction[tablet->data_dir()].end()) {
_tablet_submitted_compaction[tablet->data_dir()].erase(it_tablet);
_wakeup_producer_flag = 1;
_compaction_producer_sleep_cv.notify_one();
}
});
} else {
_compaction_thread_pool->submit_func([=]() {
CgroupsMgr::apply_system_cgroup();
_perform_base_compaction(tablet);
_permit_limiter.release(permits);
std::unique_lock<std::mutex> lock(_tablet_submitted_compaction_mutex);
std::vector<TTabletId>::iterator it_tablet =
find(_tablet_submitted_compaction[tablet->data_dir()].begin(),
_tablet_submitted_compaction[tablet->data_dir()].end(),
tablet->tablet_id());
if (it_tablet !=
_tablet_submitted_compaction[tablet->data_dir()].end()) {
_tablet_submitted_compaction[tablet->data_dir()].erase(it_tablet);
_wakeup_producer_flag = 1;
_compaction_producer_sleep_cv.notify_one();
}
});
auto st =_compaction_thread_pool->submit_func([=]() {
CgroupsMgr::apply_system_cgroup();
tablet->execute_compaction(compaction_type);
_permit_limiter.release(permits);
std::unique_lock<std::mutex> lock(_tablet_submitted_compaction_mutex);
std::vector<TTabletId>::iterator it_tablet =
find(_tablet_submitted_compaction[tablet->data_dir()].begin(),
_tablet_submitted_compaction[tablet->data_dir()].end(),
tablet->tablet_id());
if (it_tablet !=
_tablet_submitted_compaction[tablet->data_dir()].end()) {
_tablet_submitted_compaction[tablet->data_dir()].erase(it_tablet);
_wakeup_producer_flag = 1;
_compaction_producer_sleep_cv.notify_one();
}
});
if (!st.ok()) {
_permit_limiter.release(permits);
}
}
}
Expand Down
74 changes: 14 additions & 60 deletions be/src/olap/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -581,65 +581,6 @@ void StorageEngine::_start_clean_fd_cache() {
VLOG(10) << "end clean file descritpor cache";
}

void StorageEngine::_perform_cumulative_compaction(TabletSharedPtr best_tablet) {
scoped_refptr<Trace> trace(new Trace);
MonotonicStopWatch watch;
watch.start();
SCOPED_CLEANUP({
if (watch.elapsed_time() / 1e9 > config::cumulative_compaction_trace_threshold) {
LOG(WARNING) << "Trace:" << std::endl << trace->DumpToString(Trace::INCLUDE_ALL);
}
});
ADOPT_TRACE(trace.get());
TRACE("start to perform cumulative compaction");

DorisMetrics::instance()->cumulative_compaction_request_total->increment(1);

std::string tracker_label = "cumulative compaction " + std::to_string(syscall(__NR_gettid));
CumulativeCompaction cumulative_compaction(best_tablet, tracker_label, _compaction_mem_tracker);

OLAPStatus res = cumulative_compaction.compact();
if (res != OLAP_SUCCESS) {
if (res != OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS) {
best_tablet->set_last_cumu_compaction_failure_time(UnixMillis());
DorisMetrics::instance()->cumulative_compaction_request_failed->increment(1);
LOG(WARNING) << "failed to do cumulative compaction. res=" << res
<< ", table=" << best_tablet->full_name();
}
return;
}
best_tablet->set_last_cumu_compaction_failure_time(0);
}

void StorageEngine::_perform_base_compaction(TabletSharedPtr best_tablet) {
scoped_refptr<Trace> trace(new Trace);
MonotonicStopWatch watch;
watch.start();
SCOPED_CLEANUP({
if (watch.elapsed_time() / 1e9 > config::base_compaction_trace_threshold) {
LOG(WARNING) << "Trace:" << std::endl << trace->DumpToString(Trace::INCLUDE_ALL);
}
});
ADOPT_TRACE(trace.get());
TRACE("start to perform base compaction");

DorisMetrics::instance()->base_compaction_request_total->increment(1);

std::string tracker_label = "base compaction " + std::to_string(syscall(__NR_gettid));
BaseCompaction base_compaction(best_tablet, tracker_label, _compaction_mem_tracker);
OLAPStatus res = base_compaction.compact();
if (res != OLAP_SUCCESS) {
best_tablet->set_last_base_compaction_failure_time(UnixMillis());
if (res != OLAP_ERR_BE_NO_SUITABLE_VERSION) {
DorisMetrics::instance()->base_compaction_request_failed->increment(1);
LOG(WARNING) << "failed to init base compaction. res=" << res
<< ", table=" << best_tablet->full_name();
}
return;
}
best_tablet->set_last_base_compaction_failure_time(0);
}

OLAPStatus StorageEngine::_start_trash_sweep(double* usage) {
OLAPStatus res = OLAP_SUCCESS;
LOG(INFO) << "start trash and snapshot sweep.";
Expand Down Expand Up @@ -1038,4 +979,17 @@ bool StorageEngine::check_rowset_id_in_unused_rowsets(const RowsetId& rowset_id)
return search != _unused_rowsets.end();
}

} // namespace doris
void StorageEngine::create_cumulative_compaction(
TabletSharedPtr best_tablet, std::shared_ptr<CumulativeCompaction>& cumulative_compaction) {
std::string tracker_label = "cumulative compaction " + std::to_string(syscall(__NR_gettid));
cumulative_compaction.reset(
new CumulativeCompaction(best_tablet, tracker_label, _compaction_mem_tracker));
}

void StorageEngine::create_base_compaction(TabletSharedPtr best_tablet,
std::shared_ptr<BaseCompaction>& base_compaction) {
std::string tracker_label = "base compaction " + std::to_string(syscall(__NR_gettid));
base_compaction.reset(new BaseCompaction(best_tablet, tracker_label, _compaction_mem_tracker));
}

} // namespace doris
8 changes: 6 additions & 2 deletions be/src/olap/storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@ class StorageEngine {

void stop();

void create_cumulative_compaction(TabletSharedPtr best_tablet,
std::shared_ptr<CumulativeCompaction>& cumulative_compaction);
void create_base_compaction(TabletSharedPtr best_tablet,
std::shared_ptr<BaseCompaction>& base_compaction);

private:
// Instance should be inited from `static open()`
// MUST NOT be called in other circumstances.
Expand Down Expand Up @@ -222,8 +227,7 @@ class StorageEngine {
void _parse_default_rowset_type();

void _start_clean_fd_cache();
void _perform_cumulative_compaction(TabletSharedPtr best_tablet);
void _perform_base_compaction(TabletSharedPtr best_tablet);

// 清理trash和snapshot文件,返回清理后的磁盘使用量
OLAPStatus _start_trash_sweep(double* usage);
// 磁盘状态监测。监测unused_flag路劲新的对应root_path unused标识位,
Expand Down
Loading