diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp index d3f6d597760824..34188910071857 100644 --- a/be/src/olap/base_compaction.cpp +++ b/be/src/olap/base_compaction.cpp @@ -29,6 +29,13 @@ BaseCompaction::BaseCompaction(TabletSharedPtr tablet, const std::string& label, BaseCompaction::~BaseCompaction() {} OLAPStatus BaseCompaction::compact() { + RETURN_NOT_OK(prepare_compact()); + RETURN_NOT_OK(execute_compact()); + + return OLAP_SUCCESS; +} + +OLAPStatus BaseCompaction::prepare_compact() { if (!_tablet->init_succeeded()) { return OLAP_ERR_INPUT_PARAMETER_ERROR; } @@ -44,9 +51,28 @@ OLAPStatus BaseCompaction::compact() { RETURN_NOT_OK(pick_rowsets_to_compact()); TRACE("rowsets picked"); TRACE_COUNTER_INCREMENT("input_rowsets_count", _input_rowsets.size()); + _tablet->set_clone_occurred(false); + + return OLAP_SUCCESS; +} + +OLAPStatus BaseCompaction::execute_compact() { + MutexLock lock(_tablet->get_base_lock(), TRY_LOCK); + if (!lock.own_lock()) { + LOG(WARNING) << "another base compaction is running. tablet=" << _tablet->full_name(); + return OLAP_ERR_BE_TRY_BE_LOCK_ERROR; + } + TRACE("got base compaction lock"); + + // Clone task may happen after compaction task is submitted to thread pool, and rowsets picked + // for compaction may change. In this case, current compaction task should not be executed. + if (_tablet->get_clone_occurred()) { + _tablet->set_clone_occurred(false); + return OLAP_ERR_BE_CLONE_OCCURRED; + } // 2. do base compaction, merge rowsets - int64_t permits = _tablet->calc_compaction_score(CompactionType::BASE_COMPACTION); + int64_t permits = get_compaction_permits(); RETURN_NOT_OK(do_compaction(permits)); TRACE("compaction finished"); diff --git a/be/src/olap/base_compaction.h b/be/src/olap/base_compaction.h index 82fc33782414ef..8b4c33ed19f951 100644 --- a/be/src/olap/base_compaction.h +++ b/be/src/olap/base_compaction.h @@ -35,6 +35,11 @@ class BaseCompaction : public Compaction { OLAPStatus compact() override; + OLAPStatus prepare_compact() override; + OLAPStatus execute_compact() override; + + std::vector get_input_rowsets() { return _input_rowsets; } + protected: OLAPStatus pick_rowsets_to_compact() override; std::string compaction_name() const override { return "base compaction"; } diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index e48065b07963c5..e5534def357d4f 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -66,8 +66,8 @@ OLAPStatus Compaction::do_compaction_impl(int64_t permits) { _tablet->compute_version_hash_from_rowsets(_input_rowsets, &_output_version_hash); LOG(INFO) << "start " << compaction_name() << ". tablet=" << _tablet->full_name() - << ", output version is=" << _output_version.first << "-" << _output_version.second - << ", score: " << permits; + << ", output_version=" << _output_version.first << "-" << _output_version.second + << ", permits: " << permits; RETURN_NOT_OK(construct_output_rowset_writer()); RETURN_NOT_OK(construct_input_rowset_readers()); @@ -268,4 +268,12 @@ int64_t Compaction::_get_input_num_rows_from_seg_grps() { return num_rows; } -} // namespace doris +int64_t Compaction::get_compaction_permits() { + int64_t permits = 0; + for (auto rowset : _input_rowsets) { + permits += rowset->rowset_meta()->get_compaction_score(); + } + return permits; +} + +} // namespace doris diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h index dffbf060e49f5d..a2fedce0a928dd 100644 --- a/be/src/olap/compaction.h +++ b/be/src/olap/compaction.h @@ -50,6 +50,9 @@ class Compaction { virtual OLAPStatus compact() = 0; + virtual OLAPStatus prepare_compact() = 0; + virtual OLAPStatus execute_compact() = 0; + protected: virtual OLAPStatus pick_rowsets_to_compact() = 0; virtual std::string compaction_name() const = 0; @@ -68,6 +71,7 @@ class Compaction { OLAPStatus check_correctness(const Merger::Statistics& stats); OLAPStatus find_longest_consecutive_version(std::vector* rowsets, std::vector* missing_version); + int64_t get_compaction_permits(); private: // get num rows from segment group meta of input rowsets. diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp index c341e0cf1fc07d..8eed07553673b7 100644 --- a/be/src/olap/cumulative_compaction.cpp +++ b/be/src/olap/cumulative_compaction.cpp @@ -31,6 +31,13 @@ CumulativeCompaction::CumulativeCompaction(TabletSharedPtr tablet, const std::st CumulativeCompaction::~CumulativeCompaction() {} OLAPStatus CumulativeCompaction::compact() { + RETURN_NOT_OK(prepare_compact()); + RETURN_NOT_OK(execute_compact()); + + return OLAP_SUCCESS; +} + +OLAPStatus CumulativeCompaction::prepare_compact() { if (!_tablet->init_succeeded()) { return OLAP_ERR_CUMULATIVE_INVALID_PARAMETERS; } @@ -52,9 +59,28 @@ OLAPStatus CumulativeCompaction::compact() { RETURN_NOT_OK(pick_rowsets_to_compact()); TRACE("rowsets picked"); TRACE_COUNTER_INCREMENT("input_rowsets_count", _input_rowsets.size()); + _tablet->set_clone_occurred(false); + + return OLAP_SUCCESS; +} + +OLAPStatus CumulativeCompaction::execute_compact() { + MutexLock lock(_tablet->get_cumulative_lock(), TRY_LOCK); + if (!lock.own_lock()) { + LOG(INFO) << "The tablet is under cumulative compaction. tablet=" << _tablet->full_name(); + return OLAP_ERR_CE_TRY_CE_LOCK_ERROR; + } + TRACE("got cumulative compaction lock"); + + // Clone task may happen after compaction task is submitted to thread pool, and rowsets picked + // for compaction may change. In this case, current compaction task should not be executed. + if (_tablet->get_clone_occurred()) { + _tablet->set_clone_occurred(false); + return OLAP_ERR_CUMULATIVE_CLONE_OCCURRED; + } // 3. do cumulative compaction, merge rowsets - int64_t permits = _tablet->calc_compaction_score(CompactionType::CUMULATIVE_COMPACTION); + int64_t permits = get_compaction_permits(); RETURN_NOT_OK(do_compaction(permits)); TRACE("compaction finished"); diff --git a/be/src/olap/cumulative_compaction.h b/be/src/olap/cumulative_compaction.h index cbfadfaf44162b..f5ada614d63042 100644 --- a/be/src/olap/cumulative_compaction.h +++ b/be/src/olap/cumulative_compaction.h @@ -33,6 +33,11 @@ class CumulativeCompaction : public Compaction { OLAPStatus compact() override; + OLAPStatus prepare_compact() override; + OLAPStatus execute_compact() override; + + std::vector get_input_rowsets() { return _input_rowsets; } + protected: OLAPStatus pick_rowsets_to_compact() override; diff --git a/be/src/olap/olap_define.h b/be/src/olap/olap_define.h index 6a2d2b76437e92..ac2d595367f8c6 100644 --- a/be/src/olap/olap_define.h +++ b/be/src/olap/olap_define.h @@ -230,6 +230,7 @@ enum OLAPStatus { OLAP_ERR_BE_INVALID_NEED_MERGED_VERSIONS = -810, OLAP_ERR_BE_ERROR_DELETE_ACTION = -811, OLAP_ERR_BE_SEGMENTS_OVERLAPPING = -812, + OLAP_ERR_BE_CLONE_OCCURRED = -813, // PUSH // [-900, -1000) @@ -339,6 +340,7 @@ enum OLAPStatus { OLAP_ERR_CUMULATIVE_INVALID_NEED_MERGED_VERSIONS = -2004, OLAP_ERR_CUMULATIVE_ERROR_DELETE_ACTION = -2005, OLAP_ERR_CUMULATIVE_MISS_VERSION = -2006, + OLAP_ERR_CUMULATIVE_CLONE_OCCURRED = -2007, // OLAPMeta // [-3000, -3100) diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 54c07790756ea3..b3322aac931e30 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -345,7 +345,10 @@ void StorageEngine::_compaction_tasks_producer_callback() { continue; } for (const auto& tablet : tablets_compaction) { - int64_t permits = tablet->calc_compaction_score(compaction_type); + int64_t permits = tablet->prepare_compaction_and_calculate_permits(compaction_type, tablet); + if (permits == 0) { + continue; + } if (_permit_limiter.request(permits)) { { // Push to _tablet_submitted_compaction before submitting task @@ -353,40 +356,24 @@ void StorageEngine::_compaction_tasks_producer_callback() { _tablet_submitted_compaction[tablet->data_dir()].emplace_back( tablet->tablet_id()); } - if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) { - _compaction_thread_pool->submit_func([=]() { - CgroupsMgr::apply_system_cgroup(); - _perform_cumulative_compaction(tablet); - _permit_limiter.release(permits); - std::unique_lock lock(_tablet_submitted_compaction_mutex); - std::vector::iterator it_tablet = - find(_tablet_submitted_compaction[tablet->data_dir()].begin(), - _tablet_submitted_compaction[tablet->data_dir()].end(), - tablet->tablet_id()); - if (it_tablet != - _tablet_submitted_compaction[tablet->data_dir()].end()) { - _tablet_submitted_compaction[tablet->data_dir()].erase(it_tablet); - _wakeup_producer_flag = 1; - _compaction_producer_sleep_cv.notify_one(); - } - }); - } else { - _compaction_thread_pool->submit_func([=]() { - CgroupsMgr::apply_system_cgroup(); - _perform_base_compaction(tablet); - _permit_limiter.release(permits); - std::unique_lock lock(_tablet_submitted_compaction_mutex); - std::vector::iterator it_tablet = - find(_tablet_submitted_compaction[tablet->data_dir()].begin(), - _tablet_submitted_compaction[tablet->data_dir()].end(), - tablet->tablet_id()); - if (it_tablet != - _tablet_submitted_compaction[tablet->data_dir()].end()) { - _tablet_submitted_compaction[tablet->data_dir()].erase(it_tablet); - _wakeup_producer_flag = 1; - _compaction_producer_sleep_cv.notify_one(); - } - }); + auto st =_compaction_thread_pool->submit_func([=]() { + CgroupsMgr::apply_system_cgroup(); + tablet->execute_compaction(compaction_type); + _permit_limiter.release(permits); + std::unique_lock lock(_tablet_submitted_compaction_mutex); + std::vector::iterator it_tablet = + find(_tablet_submitted_compaction[tablet->data_dir()].begin(), + _tablet_submitted_compaction[tablet->data_dir()].end(), + tablet->tablet_id()); + if (it_tablet != + _tablet_submitted_compaction[tablet->data_dir()].end()) { + _tablet_submitted_compaction[tablet->data_dir()].erase(it_tablet); + _wakeup_producer_flag = 1; + _compaction_producer_sleep_cv.notify_one(); + } + }); + if (!st.ok()) { + _permit_limiter.release(permits); } } } diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 970ca01895d27d..409cb6a9b3e305 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -581,65 +581,6 @@ void StorageEngine::_start_clean_fd_cache() { VLOG(10) << "end clean file descritpor cache"; } -void StorageEngine::_perform_cumulative_compaction(TabletSharedPtr best_tablet) { - scoped_refptr trace(new Trace); - MonotonicStopWatch watch; - watch.start(); - SCOPED_CLEANUP({ - if (watch.elapsed_time() / 1e9 > config::cumulative_compaction_trace_threshold) { - LOG(WARNING) << "Trace:" << std::endl << trace->DumpToString(Trace::INCLUDE_ALL); - } - }); - ADOPT_TRACE(trace.get()); - TRACE("start to perform cumulative compaction"); - - DorisMetrics::instance()->cumulative_compaction_request_total->increment(1); - - std::string tracker_label = "cumulative compaction " + std::to_string(syscall(__NR_gettid)); - CumulativeCompaction cumulative_compaction(best_tablet, tracker_label, _compaction_mem_tracker); - - OLAPStatus res = cumulative_compaction.compact(); - if (res != OLAP_SUCCESS) { - if (res != OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS) { - best_tablet->set_last_cumu_compaction_failure_time(UnixMillis()); - DorisMetrics::instance()->cumulative_compaction_request_failed->increment(1); - LOG(WARNING) << "failed to do cumulative compaction. res=" << res - << ", table=" << best_tablet->full_name(); - } - return; - } - best_tablet->set_last_cumu_compaction_failure_time(0); -} - -void StorageEngine::_perform_base_compaction(TabletSharedPtr best_tablet) { - scoped_refptr trace(new Trace); - MonotonicStopWatch watch; - watch.start(); - SCOPED_CLEANUP({ - if (watch.elapsed_time() / 1e9 > config::base_compaction_trace_threshold) { - LOG(WARNING) << "Trace:" << std::endl << trace->DumpToString(Trace::INCLUDE_ALL); - } - }); - ADOPT_TRACE(trace.get()); - TRACE("start to perform base compaction"); - - DorisMetrics::instance()->base_compaction_request_total->increment(1); - - std::string tracker_label = "base compaction " + std::to_string(syscall(__NR_gettid)); - BaseCompaction base_compaction(best_tablet, tracker_label, _compaction_mem_tracker); - OLAPStatus res = base_compaction.compact(); - if (res != OLAP_SUCCESS) { - best_tablet->set_last_base_compaction_failure_time(UnixMillis()); - if (res != OLAP_ERR_BE_NO_SUITABLE_VERSION) { - DorisMetrics::instance()->base_compaction_request_failed->increment(1); - LOG(WARNING) << "failed to init base compaction. res=" << res - << ", table=" << best_tablet->full_name(); - } - return; - } - best_tablet->set_last_base_compaction_failure_time(0); -} - OLAPStatus StorageEngine::_start_trash_sweep(double* usage) { OLAPStatus res = OLAP_SUCCESS; LOG(INFO) << "start trash and snapshot sweep."; @@ -1038,4 +979,17 @@ bool StorageEngine::check_rowset_id_in_unused_rowsets(const RowsetId& rowset_id) return search != _unused_rowsets.end(); } -} // namespace doris +void StorageEngine::create_cumulative_compaction( + TabletSharedPtr best_tablet, std::shared_ptr& cumulative_compaction) { + std::string tracker_label = "cumulative compaction " + std::to_string(syscall(__NR_gettid)); + cumulative_compaction.reset( + new CumulativeCompaction(best_tablet, tracker_label, _compaction_mem_tracker)); +} + +void StorageEngine::create_base_compaction(TabletSharedPtr best_tablet, + std::shared_ptr& base_compaction) { + std::string tracker_label = "base compaction " + std::to_string(syscall(__NR_gettid)); + base_compaction.reset(new BaseCompaction(best_tablet, tracker_label, _compaction_mem_tracker)); +} + +} // namespace doris diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 347f29b60f4311..fb7de1d5665413 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -169,6 +169,11 @@ class StorageEngine { void stop(); + void create_cumulative_compaction(TabletSharedPtr best_tablet, + std::shared_ptr& cumulative_compaction); + void create_base_compaction(TabletSharedPtr best_tablet, + std::shared_ptr& base_compaction); + private: // Instance should be inited from `static open()` // MUST NOT be called in other circumstances. @@ -222,8 +227,7 @@ class StorageEngine { void _parse_default_rowset_type(); void _start_clean_fd_cache(); - void _perform_cumulative_compaction(TabletSharedPtr best_tablet); - void _perform_base_compaction(TabletSharedPtr best_tablet); + // 清理trash和snapshot文件,返回清理后的磁盘使用量 OLAPStatus _start_trash_sweep(double* usage); // 磁盘状态监测。监测unused_flag路劲新的对应root_path unused标识位, diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index e4dd7de984ca51..d204c8b484d6fd 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -28,6 +28,8 @@ #include #include +#include "olap/base_compaction.h" +#include "olap/cumulative_compaction.h" #include "olap/olap_common.h" #include "olap/olap_define.h" #include "olap/reader.h" @@ -38,7 +40,9 @@ #include "olap/tablet_meta_manager.h" #include "util/path_util.h" #include "util/pretty_printer.h" +#include "util/scoped_cleanup.h" #include "util/time.h" +#include "util/trace.h" namespace doris { @@ -67,7 +71,8 @@ Tablet::Tablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir, _cumulative_point(K_INVALID_CUMULATIVE_POINT), _cumulative_compaction_type(cumulative_compaction_type), _last_record_scan_count(0), - _last_record_scan_count_timestamp(time(nullptr)) { + _last_record_scan_count_timestamp(time(nullptr)), + _is_clone_occurred(false) { // construct _timestamped_versioned_tracker from rs and stale rs meta _timestamped_version_tracker.construct_versioned_tracker(_tablet_meta->all_rs_metas(), _tablet_meta->all_stale_rs_metas()); @@ -1320,4 +1325,107 @@ double Tablet::calculate_scan_frequency() { return scan_frequency; } -} // namespace doris +int64_t Tablet::prepare_compaction_and_calculate_permits(CompactionType compaction_type, + TabletSharedPtr tablet) { + std::vector compaction_rowsets; + if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) { + scoped_refptr trace(new Trace); + MonotonicStopWatch watch; + watch.start(); + SCOPED_CLEANUP({ + if (watch.elapsed_time() / 1e9 > config::cumulative_compaction_trace_threshold) { + LOG(WARNING) << "Trace:" << std::endl << trace->DumpToString(Trace::INCLUDE_ALL); + } + }); + ADOPT_TRACE(trace.get()); + + TRACE("create cumulative compaction"); + StorageEngine::instance()->create_cumulative_compaction(tablet, _cumulative_compaction); + DorisMetrics::instance()->cumulative_compaction_request_total->increment(1); + OLAPStatus res = _cumulative_compaction->prepare_compact(); + if (res != OLAP_SUCCESS) { + return 0; + } + compaction_rowsets = _cumulative_compaction->get_input_rowsets(); + } else { + DCHECK_EQ(compaction_type, CompactionType::BASE_COMPACTION); + scoped_refptr trace(new Trace); + MonotonicStopWatch watch; + watch.start(); + SCOPED_CLEANUP({ + if (watch.elapsed_time() / 1e9 > config::base_compaction_trace_threshold) { + LOG(WARNING) << "Trace:" << std::endl << trace->DumpToString(Trace::INCLUDE_ALL); + } + }); + ADOPT_TRACE(trace.get()); + + TRACE("create base compaction"); + StorageEngine::instance()->create_base_compaction(tablet, _base_compaction); + DorisMetrics::instance()->base_compaction_request_total->increment(1); + OLAPStatus res = _base_compaction->prepare_compact(); + if (res != OLAP_SUCCESS) { + set_last_base_compaction_failure_time(UnixMillis()); + DorisMetrics::instance()->base_compaction_request_failed->increment(1); + if (res != OLAP_ERR_BE_NO_SUITABLE_VERSION) { + LOG(WARNING) << "failed to pick rowsets for base compaction. res=" << res + << ", tablet=" << full_name(); + } + return 0; + } + compaction_rowsets = _base_compaction->get_input_rowsets(); + } + int64_t permits = 0; + for (auto rowset : compaction_rowsets) { + permits += rowset->rowset_meta()->get_compaction_score(); + } + return permits; +} + +void Tablet::execute_compaction(CompactionType compaction_type) { + if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) { + scoped_refptr trace(new Trace); + MonotonicStopWatch watch; + watch.start(); + SCOPED_CLEANUP({ + if (watch.elapsed_time() / 1e9 > config::cumulative_compaction_trace_threshold) { + LOG(WARNING) << "Trace:" << std::endl << trace->DumpToString(Trace::INCLUDE_ALL); + } + }); + ADOPT_TRACE(trace.get()); + + TRACE("execute cumulative compaction"); + OLAPStatus res = _cumulative_compaction->execute_compact(); + if (res != OLAP_SUCCESS) { + set_last_cumu_compaction_failure_time(UnixMillis()); + DorisMetrics::instance()->cumulative_compaction_request_failed->increment(1); + LOG(WARNING) << "failed to do cumulative compaction. res=" << res + << ", tablet=" << full_name(); + return; + } + set_last_cumu_compaction_failure_time(0); + } else { + DCHECK_EQ(compaction_type, CompactionType::BASE_COMPACTION); + scoped_refptr trace(new Trace); + MonotonicStopWatch watch; + watch.start(); + SCOPED_CLEANUP({ + if (watch.elapsed_time() / 1e9 > config::base_compaction_trace_threshold) { + LOG(WARNING) << "Trace:" << std::endl << trace->DumpToString(Trace::INCLUDE_ALL); + } + }); + ADOPT_TRACE(trace.get()); + + TRACE("create base compaction"); + OLAPStatus res = _base_compaction->execute_compact(); + if (res != OLAP_SUCCESS) { + set_last_base_compaction_failure_time(UnixMillis()); + DorisMetrics::instance()->base_compaction_request_failed->increment(1); + LOG(WARNING) << "failed to do base compaction. res=" << res + << ", tablet=" << full_name(); + return; + } + set_last_base_compaction_failure_time(0); + } +} + +} // namespace doris diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 2f718a9d942794..83d37dd8aaf237 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -45,6 +45,8 @@ class DataDir; class Tablet; class TabletMeta; class CumulativeCompactionPolicy; +class CumulativeCompaction; +class BaseCompaction; using TabletSharedPtr = std::shared_ptr; @@ -242,6 +244,12 @@ class Tablet : public BaseTablet { double calculate_scan_frequency(); + int64_t prepare_compaction_and_calculate_permits(CompactionType compaction_type, TabletSharedPtr tablet); + void execute_compaction(CompactionType compaction_type); + + void set_clone_occurred(bool clone_occurred) { _is_clone_occurred = clone_occurred; } + bool get_clone_occurred() { return _is_clone_occurred; } + private: OLAPStatus _init_once_action(); void _print_missed_versions(const std::vector& missed_versions) const; @@ -321,6 +329,11 @@ class Tablet : public BaseTablet { // the timestamp of the last record. time_t _last_record_scan_count_timestamp; + std::shared_ptr _cumulative_compaction; + std::shared_ptr _base_compaction; + // whether clone task occurred during the tablet is in thread pool queue to wait for compaction + std::atomic _is_clone_occurred; + DISALLOW_COPY_AND_ASSIGN(Tablet); public: diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp index 3df40e7d6433d9..c27db9bf4b6d1a 100644 --- a/be/src/olap/task/engine_clone_task.cpp +++ b/be/src/olap/task/engine_clone_task.cpp @@ -611,6 +611,7 @@ OLAPStatus EngineCloneTask::_finish_clone(Tablet* tablet, const string& clone_di // clone and compaction operation should be performed sequentially tablet->obtain_base_compaction_lock(); tablet->obtain_cumulative_lock(); + tablet->set_clone_occurred(true); tablet->obtain_push_lock(); tablet->obtain_header_wrlock();