diff --git a/be/src/common/config.h b/be/src/common/config.h index b074c85cbd640c..fcad45fdab9bc6 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -264,12 +264,12 @@ namespace config { // be policy // whether disable automatic compaction task CONF_mBool(disable_auto_compaction, "false"); + // check the configuration of auto compaction in seconds when auto compaction disabled + CONF_mInt32(check_auto_compaction_interval_seconds, "5"); // CONF_Int64(base_compaction_start_hour, "20"); // CONF_Int64(base_compaction_end_hour, "7"); - CONF_mInt32(base_compaction_check_interval_seconds, "60"); CONF_mInt64(base_compaction_num_cumulative_deltas, "5"); - CONF_Int32(base_compaction_num_threads_per_disk, "1"); CONF_mDouble(base_cumulative_delta_ratio, "0.3"); CONF_mInt64(base_compaction_interval_seconds_since_last_operation, "86400"); CONF_mInt32(base_compaction_write_mbytes_per_sec, "5"); @@ -296,10 +296,8 @@ namespace config { CONF_mInt64(cumulative_size_based_compaction_lower_size_mbytes, "64"); // cumulative compaction policy: max delta file's size unit:B - CONF_mInt32(cumulative_compaction_check_interval_seconds, "10"); CONF_mInt64(min_cumulative_compaction_num_singleton_deltas, "5"); CONF_mInt64(max_cumulative_compaction_num_singleton_deltas, "1000"); - CONF_Int32(cumulative_compaction_num_threads_per_disk, "1"); CONF_mInt64(cumulative_compaction_budgeted_bytes, "104857600"); // CONF_Int32(cumulative_compaction_write_mbytes_per_sec, "100"); // cumulative compaction skips recently published deltas in order to prevent @@ -310,13 +308,19 @@ namespace config { // if compaction of a tablet failed, this tablet should not be chosen to // compaction until this interval passes. CONF_mInt64(min_compaction_failure_interval_sec, "600"); // 10 min - // Too many compaction tasks may run out of memory. - // This config is to limit the max concurrency of running compaction tasks. - // -1 means no limit, and the max concurrency will be: - // C = (cumulative_compaction_num_threads_per_disk + base_compaction_num_threads_per_disk) * dir_num - // set it to larger than C will be set to equal to C. - // This config can be set to 0, which means to forbid any compaction, for some special cases. - CONF_Int32(max_compaction_concurrency, "-1"); + + // This config can be set to limit thread number in compaction thread pool. + CONF_mInt32(min_compaction_threads, "10"); + CONF_mInt32(max_compaction_threads, "10"); + + // The upper limit of "permits" held by all compaction tasks. This config can be set to limit memory consumption for compaction. + CONF_mInt64(total_permits_for_compaction_score, "10000"); + + // Compaction task number per disk. + CONF_mInt32(compaction_task_num_per_disk, "2"); + + // How many rounds of cumulative compaction for each round of base compaction when compaction tasks generation. + CONF_mInt32(cumulative_compaction_rounds_for_each_base_compaction_round, "9"); // Threshold to logging compaction trace, in seconds. CONF_mInt32(base_compaction_trace_threshold, "10"); diff --git a/be/src/olap/CMakeLists.txt b/be/src/olap/CMakeLists.txt index 13c11a073c3075..3768b5eb8217aa 100644 --- a/be/src/olap/CMakeLists.txt +++ b/be/src/olap/CMakeLists.txt @@ -34,6 +34,7 @@ add_library(Olap STATIC bloom_filter_writer.cpp byte_buffer.cpp compaction.cpp + compaction_permit_limiter.cpp comparison_predicate.cpp compress.cpp cumulative_compaction.cpp diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp index bb2404ca3be0b8..a38c244f478daf 100644 --- a/be/src/olap/base_compaction.cpp +++ b/be/src/olap/base_compaction.cpp @@ -45,7 +45,8 @@ OLAPStatus BaseCompaction::compact() { TRACE_COUNTER_INCREMENT("input_rowsets_count", _input_rowsets.size()); // 2. do base compaction, merge rowsets - RETURN_NOT_OK(do_compaction()); + int64_t permits = _tablet->calc_base_compaction_score(); + RETURN_NOT_OK(do_compaction(permits)); TRACE("compaction finished"); // 3. set state to success diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 67b56f6e763172..aa0f0b4523b94f 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -25,8 +25,6 @@ using std::vector; namespace doris { -Semaphore Compaction::_concurrency_sem; - Compaction::Compaction(TabletSharedPtr tablet, const std::string& label, const std::shared_ptr& parent_tracker) : _mem_tracker(MemTracker::CreateTracker(-1, label, parent_tracker)), _readers_tracker(MemTracker::CreateTracker(-1, "readers tracker", _mem_tracker)), @@ -37,20 +35,17 @@ Compaction::Compaction(TabletSharedPtr tablet, const std::string& label, const s Compaction::~Compaction() {} -OLAPStatus Compaction::init(int concurreny) { - _concurrency_sem.set_count(concurreny); - return OLAP_SUCCESS; -} - -OLAPStatus Compaction::do_compaction() { - _concurrency_sem.wait(); - TRACE("got concurrency lock and start to do compaction"); - OLAPStatus st = do_compaction_impl(); - _concurrency_sem.signal(); +OLAPStatus Compaction::do_compaction(int64_t permits) { + TRACE("start to do compaction"); + _tablet->data_dir()->disks_compaction_score_increment(permits); + _tablet->data_dir()->disks_compaction_num_increment(1); + OLAPStatus st = do_compaction_impl(permits); + _tablet->data_dir()->disks_compaction_score_increment(-permits); + _tablet->data_dir()->disks_compaction_num_increment(-1); return st; } -OLAPStatus Compaction::do_compaction_impl() { +OLAPStatus Compaction::do_compaction_impl(int64_t permits) { OlapStopWatch watch; // 1. prepare input and output parameters @@ -68,7 +63,8 @@ OLAPStatus Compaction::do_compaction_impl() { _tablet->compute_version_hash_from_rowsets(_input_rowsets, &_output_version_hash); LOG(INFO) << "start " << compaction_name() << ". tablet=" << _tablet->full_name() - << ", output version is=" << _output_version.first << "-" << _output_version.second; + << ", output version is=" << _output_version.first << "-" << _output_version.second + << ", score: " << permits; RETURN_NOT_OK(construct_output_rowset_writer()); RETURN_NOT_OK(construct_input_rowset_readers()); diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h index 6c4b438aaeb260..f43bc6f1dcc2a4 100644 --- a/be/src/olap/compaction.h +++ b/be/src/olap/compaction.h @@ -49,15 +49,13 @@ class Compaction { virtual OLAPStatus compact() = 0; - static OLAPStatus init(int concurreny); - protected: virtual OLAPStatus pick_rowsets_to_compact() = 0; virtual std::string compaction_name() const = 0; virtual ReaderType compaction_type() const = 0; - OLAPStatus do_compaction(); - OLAPStatus do_compaction_impl(); + OLAPStatus do_compaction(int64_t permits); + OLAPStatus do_compaction_impl(int64_t permits); void modify_rowsets(); OLAPStatus gc_unused_rowsets(); @@ -68,9 +66,6 @@ class Compaction { OLAPStatus check_version_continuity(const std::vector& rowsets); OLAPStatus check_correctness(const Merger::Statistics& stats); - // semaphore used to limit the concurrency of running compaction tasks - static Semaphore _concurrency_sem; - private: // get num rows from segment group meta of input rowsets. // return -1 if these are not alpha rowsets. diff --git a/be/src/olap/compaction_permit_limiter.cpp b/be/src/olap/compaction_permit_limiter.cpp new file mode 100644 index 00000000000000..b0a08e6679db94 --- /dev/null +++ b/be/src/olap/compaction_permit_limiter.cpp @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/compaction_permit_limiter.h" + +namespace doris { + +CompactionPermitLimiter::CompactionPermitLimiter() : _used_permits(0) {} + +bool CompactionPermitLimiter::request(int64_t permits) { + if (permits > config::total_permits_for_compaction_score) { + // when tablet's compaction score is larger than "config::total_permits_for_compaction_score", + // it's necessary to do compaction for this tablet because this tablet will not get "permits" + // anyway. otherwise, compaction task for this tablet will not be executed forever. + std::unique_lock lock(_permits_mutex); + _permits_cv.wait(lock, [=] { + return _used_permits == 0 || + _used_permits + permits <= config::total_permits_for_compaction_score; + }); + } else { + if (_used_permits + permits > config::total_permits_for_compaction_score) { + std::unique_lock lock(_permits_mutex); + _permits_cv.wait(lock, [=] { + return _used_permits + permits <= config::total_permits_for_compaction_score; + }); + } + } + _used_permits += permits; + return true; +} + +void CompactionPermitLimiter::release(int64_t permits) { + _used_permits -= permits; + _permits_cv.notify_one(); +} +} // namespace doris diff --git a/be/src/olap/compaction_permit_limiter.h b/be/src/olap/compaction_permit_limiter.h new file mode 100644 index 00000000000000..b8a216d361ff33 --- /dev/null +++ b/be/src/olap/compaction_permit_limiter.h @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "common/config.h" +#include "olap/utils.h" + +namespace doris { + +/* + This class is used to control compaction permission. To some extent, it can be used to control memory consumption. + "permits" should be applied before a compaction task can execute. When the sum of "permites" held by executing + compaction tasks reaches a threshold, subsequent compaction task will be no longer allowed, until some "permits" + are released by some finished compaction tasks. "compaction score" for tablet is used as "permits" here. +*/ +class CompactionPermitLimiter { +public: + CompactionPermitLimiter(); + virtual ~CompactionPermitLimiter() {} + + bool request(int64_t permits); + + void release(int64_t permits); + +private: + // sum of "permits" held by executing compaction tasks currently + AtomicInt64 _used_permits; + std::mutex _permits_mutex; + std::condition_variable _permits_cv; +}; +} // namespace doris diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp index 6611e5102be336..c2c8755c694e6e 100755 --- a/be/src/olap/cumulative_compaction.cpp +++ b/be/src/olap/cumulative_compaction.cpp @@ -53,7 +53,8 @@ OLAPStatus CumulativeCompaction::compact() { TRACE_COUNTER_INCREMENT("input_rowsets_count", _input_rowsets.size()); // 3. do cumulative compaction, merge rowsets - RETURN_NOT_OK(do_compaction()); + int64_t permits = _tablet->calc_cumulative_compaction_score(); + RETURN_NOT_OK(do_compaction(permits)); TRACE("compaction finished"); // 4. set state to success diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp index 42afdd8232b610..c20849b05fa598 100644 --- a/be/src/olap/data_dir.cpp +++ b/be/src/olap/data_dir.cpp @@ -59,6 +59,8 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(disks_total_capacity, MetricUnit::BYTES); DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(disks_avail_capacity, MetricUnit::BYTES); DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(disks_data_used_capacity, MetricUnit::BYTES); DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(disks_state, MetricUnit::BYTES); +DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(disks_compaction_score, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(disks_compaction_num, MetricUnit::NOUNIT); static const char* const kMtabPath = "/etc/mtab"; static const char* const kTestFilePath = "/.testfile"; @@ -83,6 +85,8 @@ DataDir::DataDir(const std::string& path, int64_t capacity_bytes, INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_avail_capacity); INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_data_used_capacity); INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_state); + INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_compaction_score); + INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_compaction_num); } DataDir::~DataDir() { @@ -986,7 +990,7 @@ void DataDir::update_user_data_size(int64_t size) { bool DataDir::reach_capacity_limit(int64_t incoming_data_size) { double used_pct = (_disk_capacity_bytes - _available_bytes + incoming_data_size) / (double)_disk_capacity_bytes; - int64_t left_bytes = _disk_capacity_bytes - _available_bytes - incoming_data_size; + int64_t left_bytes = _available_bytes - incoming_data_size; if (used_pct >= config::storage_flood_stage_usage_percent / 100.0 && left_bytes <= config::storage_flood_stage_left_capacity_bytes) { @@ -996,4 +1000,12 @@ bool DataDir::reach_capacity_limit(int64_t incoming_data_size) { } return false; } + +void DataDir::disks_compaction_score_increment(int64_t delta) { + disks_compaction_score->increment(delta); +} + +void DataDir::disks_compaction_num_increment(int64_t delta) { + disks_compaction_num->increment(delta); +} } // namespace doris diff --git a/be/src/olap/data_dir.h b/be/src/olap/data_dir.h index 0fc8b4a0969482..898f644f122581 100644 --- a/be/src/olap/data_dir.h +++ b/be/src/olap/data_dir.h @@ -128,6 +128,10 @@ class DataDir { std::set tablet_set() { return _tablet_set; } + void disks_compaction_score_increment(int64_t delta); + + void disks_compaction_num_increment(int64_t delta); + private: std::string _cluster_id_path() const { return _path + CLUSTER_ID_PREFIX; } Status _init_cluster_id(); @@ -201,6 +205,8 @@ class DataDir { IntGauge* disks_avail_capacity; IntGauge* disks_data_used_capacity; IntGauge* disks_state; + IntGauge* disks_compaction_score; + IntGauge* disks_compaction_num; }; } // namespace doris diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 3f860f4f5971ec..57ed54d58a75c0 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -27,12 +27,12 @@ #include #include +#include "agent/cgroups_mgr.h" #include "common/status.h" #include "olap/cumulative_compaction.h" #include "olap/olap_common.h" #include "olap/olap_define.h" #include "olap/storage_engine.h" -#include "agent/cgroups_mgr.h" #include "util/time.h" using std::string; @@ -63,52 +63,28 @@ Status StorageEngine::start_bg_threads() { &_disk_stat_monitor_thread)); LOG(INFO) << "disk stat monitor thread started"; - + // convert store map to vector std::vector data_dirs; for (auto& tmp_store : _store_map) { data_dirs.push_back(tmp_store.second); } - int32_t data_dir_num = data_dirs.size(); // check cumulative compaction config _check_cumulative_compaction_config(); - // base and cumulative compaction threads - int32_t base_compaction_num_threads_per_disk = std::max(1, config::base_compaction_num_threads_per_disk); - int32_t cumulative_compaction_num_threads_per_disk = std::max(1, config::cumulative_compaction_num_threads_per_disk); - int32_t base_compaction_num_threads = base_compaction_num_threads_per_disk * data_dir_num; - int32_t cumulative_compaction_num_threads = cumulative_compaction_num_threads_per_disk * data_dir_num; - // calc the max concurrency of compaction tasks - int32_t max_compaction_concurrency = config::max_compaction_concurrency; - if (max_compaction_concurrency < 0 - || max_compaction_concurrency > base_compaction_num_threads + cumulative_compaction_num_threads + 1) { - // reserve 1 thread for manual execution - max_compaction_concurrency = base_compaction_num_threads + cumulative_compaction_num_threads + 1; - } - Compaction::init(max_compaction_concurrency); - - _base_compaction_threads.reserve(base_compaction_num_threads); - for (uint32_t i = 0; i < base_compaction_num_threads; ++i) { - scoped_refptr base_compaction_thread; - RETURN_IF_ERROR( - Thread::create("StorageEngine", "base_compaction_thread", - [this, i, data_dir_num, data_dirs]() { this->_base_compaction_thread_callback(data_dirs[i % data_dir_num]); }, - &base_compaction_thread)); - _base_compaction_threads.emplace_back(base_compaction_thread); - } - LOG(INFO) << "base compaction threads started. number: " << base_compaction_num_threads; + int32_t max_thread_num = config::max_compaction_threads; + int32_t min_thread_num = config::min_compaction_threads; + ThreadPoolBuilder("CompactionTaskThreadPool") + .set_min_threads(min_thread_num) + .set_max_threads(max_thread_num) + .build(&_compaction_thread_pool); - _cumulative_compaction_threads.reserve(cumulative_compaction_num_threads); - for (uint32_t i = 0; i < cumulative_compaction_num_threads; ++i) { - scoped_refptr cumulative_compaction_thread; - RETURN_IF_ERROR( - Thread::create("StorageEngine", "cumulative_compaction_thread", - [this, i, data_dir_num, data_dirs]() { this->_cumulative_compaction_thread_callback(data_dirs[i % data_dir_num]); }, - &cumulative_compaction_thread)); - _cumulative_compaction_threads.emplace_back(cumulative_compaction_thread); - } - LOG(INFO) << "cumulative compaction threads started. number: " << cumulative_compaction_num_threads; + // compaction tasks producer thread + RETURN_IF_ERROR(Thread::create("StorageEngine", "compaction_tasks_producer_thread", + [this]() { this->_compaction_tasks_producer_callback(); }, + &_compaction_tasks_producer_thread)); + LOG(INFO) << "compaction tasks producer thread started"; // tablet checkpoint thread for (auto data_dir : data_dirs) { @@ -169,33 +145,6 @@ void StorageEngine::_fd_cache_clean_callback() { } } -void StorageEngine::_base_compaction_thread_callback(DataDir* data_dir) { -#ifdef GOOGLE_PROFILER - ProfilerRegisterThread(); -#endif - - int32_t interval = config::base_compaction_check_interval_seconds; - do { - if (!config::disable_auto_compaction) { - // must be here, because this thread is start on start and - // cgroup is not initialized at this time - // add tid to cgroup - CgroupsMgr::apply_system_cgroup(); - if (!data_dir->reach_capacity_limit(0)) { - _perform_base_compaction(data_dir); - } - } - - interval = config::base_compaction_check_interval_seconds; - if (interval <= 0) { - OLAP_LOG_WARNING("base compaction check interval config is illegal: [%d], " - "force set to 1", interval); - interval = 1; - } - - } while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(interval))); -} - void StorageEngine::_garbage_sweeper_thread_callback() { #ifdef GOOGLE_PROFILER ProfilerRegisterThread(); @@ -286,33 +235,6 @@ void StorageEngine::_check_cumulative_compaction_config() { } } -void StorageEngine::_cumulative_compaction_thread_callback(DataDir* data_dir) { -#ifdef GOOGLE_PROFILER - ProfilerRegisterThread(); -#endif - LOG(INFO) << "try to start cumulative compaction process!"; - - int32_t interval = config::cumulative_compaction_check_interval_seconds; - do { - if (!config::disable_auto_compaction) { - // must be here, because this thread is start on start and - // cgroup is not initialized at this time - // add tid to cgroup - CgroupsMgr::apply_system_cgroup(); - if (!data_dir->reach_capacity_limit(0)) { - _perform_cumulative_compaction(data_dir); - } - } - - interval = config::cumulative_compaction_check_interval_seconds; - if (interval <= 0) { - LOG(WARNING) << "cumulative compaction check interval config is illegal:" << interval - << "will be forced set to one"; - interval = 1; - } - } while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(interval))); -} - void StorageEngine::_unused_rowset_monitor_thread_callback() { #ifdef GOOGLE_PROFILER ProfilerRegisterThread(); @@ -340,7 +262,7 @@ void StorageEngine::_path_gc_thread_callback(DataDir* data_dir) { do { LOG(INFO) << "try to perform path gc by tablet!"; data_dir->perform_path_gc_by_tablet(); - + LOG(INFO) << "try to perform path gc by rowsetid!"; data_dir->perform_path_gc_by_rowsetid(); @@ -391,4 +313,107 @@ void StorageEngine::_tablet_checkpoint_callback(DataDir* data_dir) { } while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(interval))); } -} // namespace doris +void StorageEngine::_compaction_tasks_producer_callback() { +#ifdef GOOGLE_PROFILER + ProfilerRegisterThread(); +#endif + LOG(INFO) << "try to start compaction producer process!"; + + std::vector tablet_submitted; + std::vector data_dirs; + for (auto& tmp_store : _store_map) { + data_dirs.push_back(tmp_store.second); + _tablet_submitted_compaction[tmp_store.second] = tablet_submitted; + } + + int round = 0; + CompactionType compaction_type; + while (true) { + if (!config::disable_auto_compaction) { + if (round < config::cumulative_compaction_rounds_for_each_base_compaction_round) { + compaction_type = CompactionType::CUMULATIVE_COMPACTION; + round++; + } else { + compaction_type = CompactionType::BASE_COMPACTION; + round = 0; + } + vector tablets_compaction = + _compaction_tasks_generator(compaction_type, data_dirs); + if (tablets_compaction.size() == 0) { + _wakeup_producer_flag = 0; + std::unique_lock lock(_compaction_producer_sleep_mutex); + // It is necessary to wake up the thread on timeout to prevent deadlock + // in case of no running compaction task. + _compaction_producer_sleep_cv.wait_for(lock, std::chrono::milliseconds(2000), + [=] { return _wakeup_producer_flag == 1; }); + continue; + } + for (const auto& tablet : tablets_compaction) { + int64_t permits = tablet->calc_compaction_score(compaction_type); + if (_permit_limiter.request(permits)) { + if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) { + _compaction_thread_pool->submit_func([=]() { + CgroupsMgr::apply_system_cgroup(); + _perform_cumulative_compaction(tablet); + _permit_limiter.release(permits); + std::unique_lock lock(_tablet_submitted_compaction_mutex); + vector::iterator it_tablet = + find(_tablet_submitted_compaction[tablet->data_dir()].begin(), + _tablet_submitted_compaction[tablet->data_dir()].end(), + tablet->tablet_id()); + if (it_tablet != + _tablet_submitted_compaction[tablet->data_dir()].end()) { + _tablet_submitted_compaction[tablet->data_dir()].erase(it_tablet); + _wakeup_producer_flag = 1; + _compaction_producer_sleep_cv.notify_one(); + } + }); + } else { + _compaction_thread_pool->submit_func([=]() { + CgroupsMgr::apply_system_cgroup(); + _perform_base_compaction(tablet); + _permit_limiter.release(permits); + std::unique_lock lock(_tablet_submitted_compaction_mutex); + vector::iterator it_tablet = + find(_tablet_submitted_compaction[tablet->data_dir()].begin(), + _tablet_submitted_compaction[tablet->data_dir()].end(), + tablet->tablet_id()); + if (it_tablet != + _tablet_submitted_compaction[tablet->data_dir()].end()) { + _tablet_submitted_compaction[tablet->data_dir()].erase(it_tablet); + _wakeup_producer_flag = 1; + _compaction_producer_sleep_cv.notify_one(); + } + }); + } + std::unique_lock lock(_tablet_submitted_compaction_mutex); + _tablet_submitted_compaction[tablet->data_dir()].emplace_back( + tablet->tablet_id()); + } + } + } else { + sleep(config::check_auto_compaction_interval_seconds); + } + } +} + +vector StorageEngine::_compaction_tasks_generator( + CompactionType compaction_type, std::vector data_dirs) { + vector tablets_compaction; + std::random_shuffle(data_dirs.begin(), data_dirs.end()); + for (auto data_dir : data_dirs) { + std::unique_lock lock(_tablet_submitted_compaction_mutex); + if (_tablet_submitted_compaction[data_dir].size() >= config::compaction_task_num_per_disk) { + continue; + } + if (!data_dir->reach_capacity_limit(0)) { + TabletSharedPtr tablet = _tablet_manager->find_best_tablet_to_compaction( + compaction_type, data_dir, _tablet_submitted_compaction[data_dir]); + if (tablet != nullptr) { + tablets_compaction.emplace_back(tablet); + } + } + } + return tablets_compaction; +} +} // namespace doris diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index a07d6b9eb93558..c2cfcc1627fa6e 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -140,6 +140,7 @@ StorageEngine::~StorageEngine() { DEREGISTER_HOOK_METRIC(unused_rowsets_count); DEREGISTER_HOOK_METRIC(compaction_mem_current_consumption); _clear(); + _compaction_thread_pool->shutdown(); } void StorageEngine::load_data_dirs(const std::vector& data_dirs) { @@ -578,7 +579,7 @@ void StorageEngine::_start_clean_fd_cache() { VLOG(10) << "end clean file descritpor cache"; } -void StorageEngine::_perform_cumulative_compaction(DataDir* data_dir) { +void StorageEngine::_perform_cumulative_compaction(TabletSharedPtr best_tablet) { scoped_refptr trace(new Trace); MonotonicStopWatch watch; watch.start(); @@ -589,12 +590,6 @@ void StorageEngine::_perform_cumulative_compaction(DataDir* data_dir) { }); ADOPT_TRACE(trace.get()); TRACE("start to perform cumulative compaction"); - TabletSharedPtr best_tablet = _tablet_manager->find_best_tablet_to_compaction( - CompactionType::CUMULATIVE_COMPACTION, data_dir); - if (best_tablet == nullptr) { - return; - } - TRACE("found best tablet $0", best_tablet->get_tablet_info().tablet_id); DorisMetrics::instance()->cumulative_compaction_request_total->increment(1); @@ -603,8 +598,8 @@ void StorageEngine::_perform_cumulative_compaction(DataDir* data_dir) { OLAPStatus res = cumulative_compaction.compact(); if (res != OLAP_SUCCESS) { - best_tablet->set_last_cumu_compaction_failure_time(UnixMillis()); if (res != OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS) { + best_tablet->set_last_cumu_compaction_failure_time(UnixMillis()); DorisMetrics::instance()->cumulative_compaction_request_failed->increment(1); LOG(WARNING) << "failed to do cumulative compaction. res=" << res << ", table=" << best_tablet->full_name(); @@ -614,7 +609,7 @@ void StorageEngine::_perform_cumulative_compaction(DataDir* data_dir) { best_tablet->set_last_cumu_compaction_failure_time(0); } -void StorageEngine::_perform_base_compaction(DataDir* data_dir) { +void StorageEngine::_perform_base_compaction(TabletSharedPtr best_tablet) { scoped_refptr trace(new Trace); MonotonicStopWatch watch; watch.start(); @@ -625,12 +620,6 @@ void StorageEngine::_perform_base_compaction(DataDir* data_dir) { }); ADOPT_TRACE(trace.get()); TRACE("start to perform base compaction"); - TabletSharedPtr best_tablet = _tablet_manager->find_best_tablet_to_compaction( - CompactionType::BASE_COMPACTION, data_dir); - if (best_tablet == nullptr) { - return; - } - TRACE("found best tablet $0", best_tablet->get_tablet_info().tablet_id); DorisMetrics::instance()->base_compaction_request_total->increment(1); diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 0b3aacfee3e81e..f5707239d7649e 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -18,18 +18,18 @@ #ifndef DORIS_BE_SRC_OLAP_STORAGE_ENGINE_H #define DORIS_BE_SRC_OLAP_STORAGE_ENGINE_H +#include #include #include #include #include -#include #include #include -#include #include +#include -#include #include +#include #include "agent/status.h" #include "common/status.h" @@ -37,20 +37,22 @@ #include "gen_cpp/BackendService_types.h" #include "gen_cpp/MasterService_types.h" #include "gutil/ref_counted.h" +#include "olap/compaction_permit_limiter.h" +#include "olap/fs/fs_util.h" #include "olap/olap_common.h" #include "olap/olap_define.h" -#include "olap/tablet.h" #include "olap/olap_meta.h" #include "olap/options.h" +#include "olap/rowset/rowset_id_generator.h" +#include "olap/tablet.h" #include "olap/tablet_manager.h" #include "olap/tablet_sync_service.h" -#include "olap/txn_manager.h" #include "olap/task/engine_task.h" -#include "olap/rowset/rowset_id_generator.h" -#include "olap/fs/fs_util.h" +#include "olap/txn_manager.h" #include "runtime/heartbeat_flags.h" #include "util/countdown_latch.h" #include "util/thread.h" +#include "util/threadpool.h" namespace doris { @@ -211,12 +213,8 @@ class StorageEngine { // unused rowset monitor thread void _unused_rowset_monitor_thread_callback(); - // base compaction thread process function - void _base_compaction_thread_callback(DataDir* data_dir); // check cumulative compaction config void _check_cumulative_compaction_config(); - // cumulative process function - void _cumulative_compaction_thread_callback(DataDir* data_dir); // garbage sweep thread process function. clear snapshot and trash folder void _garbage_sweeper_thread_callback(); @@ -238,8 +236,8 @@ class StorageEngine { void _parse_default_rowset_type(); void _start_clean_fd_cache(); - void _perform_cumulative_compaction(DataDir* data_dir); - void _perform_base_compaction(DataDir* data_dir); + void _perform_cumulative_compaction(TabletSharedPtr best_tablet); + void _perform_base_compaction(TabletSharedPtr best_tablet); // 清理trash和snapshot文件,返回清理后的磁盘使用量 OLAPStatus _start_trash_sweep(double *usage); // 磁盘状态监测。监测unused_flag路劲新的对应root_path unused标识位, @@ -248,6 +246,9 @@ class StorageEngine { // 重新加载数据。 void _start_disk_stat_monitor(); + void _compaction_tasks_producer_callback(); + vector _compaction_tasks_generator(CompactionType compaction_type, std::vector data_dirs); + private: struct CompactionCandidate { CompactionCandidate(uint32_t nicumulative_compaction_, int64_t tablet_id_, uint32_t index_) : @@ -313,6 +314,7 @@ class StorageEngine { std::vector> _base_compaction_threads; // threads to check cumulative std::vector> _cumulative_compaction_threads; + scoped_refptr _compaction_tasks_producer_thread; scoped_refptr _fd_cache_clean_thread; // threads to clean all file descriptor not actively in use std::vector> _path_gc_threads; @@ -342,6 +344,18 @@ class StorageEngine { HeartbeatFlags* _heartbeat_flags; + std::unique_ptr _compaction_thread_pool; + + CompactionPermitLimiter _permit_limiter; + + std::mutex _tablet_submitted_compaction_mutex; + std::map> _tablet_submitted_compaction; + + AtomicInt32 _wakeup_producer_flag; + + std::mutex _compaction_producer_sleep_mutex; + std::condition_variable _compaction_producer_sleep_cv; + DISALLOW_COPY_AND_ASSIGN(StorageEngine); }; diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index ea841642c0504b..f2780522fd44c8 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -741,6 +741,15 @@ bool Tablet::can_do_compaction() { return true; } +const uint32_t Tablet::calc_compaction_score(CompactionType compaction_type) const { + if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) { + return calc_cumulative_compaction_score(); + } else { + DCHECK_EQ(compaction_type, CompactionType::BASE_COMPACTION); + return calc_base_compaction_score(); + } +} + const uint32_t Tablet::calc_cumulative_compaction_score() const { uint32_t score = 0; _cumulative_compaction_policy->calc_cumulative_compaction_score( diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index c32daad3b51008..ddf2248e6ebfb1 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -163,6 +163,7 @@ class Tablet : public BaseTablet { // operation for compaction bool can_do_compaction(); + const uint32_t calc_compaction_score(CompactionType compaction_type) const; const uint32_t calc_cumulative_compaction_score() const; const uint32_t calc_base_compaction_score() const; static void compute_version_hash_from_rowsets(const std::vector& rowsets, diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index 6260f2163184e3..bf7d09890dac89 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -679,8 +679,9 @@ void TabletManager::get_tablet_stat(TTabletStatResult* result) { result->__set_tablets_stats(_tablet_stat_cache); } -TabletSharedPtr TabletManager::find_best_tablet_to_compaction(CompactionType compaction_type, - DataDir* data_dir) { +TabletSharedPtr TabletManager::find_best_tablet_to_compaction( + CompactionType compaction_type, DataDir* data_dir, + vector &tablet_submitted_compaction) { int64_t now_ms = UnixMillis(); const string& compaction_type_str = compaction_type == CompactionType::BASE_COMPACTION ? "base" : "cumulative"; uint32_t highest_score = 0; @@ -690,6 +691,12 @@ TabletSharedPtr TabletManager::find_best_tablet_to_compaction(CompactionType com tablet_map_t& tablet_map = _tablet_map_array[i]; for (tablet_map_t::value_type& table_ins : tablet_map){ for (TabletSharedPtr& tablet_ptr : table_ins.second.table_arr) { + vector::iterator it_tablet = + find(tablet_submitted_compaction.begin(), tablet_submitted_compaction.end(), + tablet_ptr->tablet_id()); + if (it_tablet != tablet_submitted_compaction.end()) { + continue; + } AlterTabletTaskSharedPtr cur_alter_task = tablet_ptr->alter_task(); if (cur_alter_task != nullptr && cur_alter_task->alter_state() != ALTER_FINISHED @@ -738,7 +745,6 @@ TabletSharedPtr TabletManager::find_best_tablet_to_compaction(CompactionType com } } - uint32_t table_score = 0; { ReadLock rdlock(tablet_ptr->get_header_lock_ptr()); @@ -757,7 +763,7 @@ TabletSharedPtr TabletManager::find_best_tablet_to_compaction(CompactionType com } if (best_tablet != nullptr) { - LOG(INFO) << "Found the best tablet for compaction. " + VLOG(1) << "Found the best tablet for compaction. " << "compaction_type=" << compaction_type_str << ", tablet_id=" << best_tablet->tablet_id() << ", highest_score=" << highest_score; diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h index ea4de8c69e96c7..dda2f9ef207990 100644 --- a/be/src/olap/tablet_manager.h +++ b/be/src/olap/tablet_manager.h @@ -69,7 +69,9 @@ class TabletManager { OLAPStatus drop_tablets_on_error_root_path(const std::vector& tablet_info_vec); - TabletSharedPtr find_best_tablet_to_compaction(CompactionType compaction_type, DataDir* data_dir); + TabletSharedPtr find_best_tablet_to_compaction( + CompactionType compaction_type, DataDir* data_dir, + vector &tablet_submitted_compaction); TabletSharedPtr get_tablet(TTabletId tablet_id, SchemaHash schema_hash, bool include_deleted = false, std::string* err = nullptr);