From 9c97588cbd55de1a62cf74197bbaf98aeb6239e4 Mon Sep 17 00:00:00 2001 From: chenlinzhong <490103404@qq.com> Date: Thu, 26 May 2022 21:58:32 +0800 Subject: [PATCH 01/11] [Feature] compaction quickly for small data import #9791 --- be/src/agent/task_worker_pool.cpp | 17 ++++++- be/src/common/config.h | 14 ++++-- be/src/common/status.h | 6 +-- be/src/olap/compaction.cpp | 19 ++++++++ be/src/olap/compaction.h | 1 + be/src/olap/delta_writer.cpp | 11 +++-- be/src/olap/olap_server.cpp | 17 +++++++ be/src/olap/storage_engine.h | 4 ++ be/src/olap/tablet.cpp | 45 ++++++++++++++++++- be/src/olap/tablet.h | 7 ++- .../olap/task/engine_publish_version_task.cpp | 10 ++++- .../olap/task/engine_publish_version_task.h | 4 +- 12 files changed, 139 insertions(+), 16 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 0a30aad20adb70..e1b249dcacfd92 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -691,11 +691,13 @@ void TaskWorkerPool::_publish_version_worker_thread_callback() { VLOG_NOTICE << "get publish version task, signature:" << agent_task_req.signature; std::vector error_tablet_ids; + std::vector succ_tablet_ids; uint32_t retry_time = 0; Status res = Status::OK(); while (retry_time < PUBLISH_VERSION_MAX_RETRY) { error_tablet_ids.clear(); - EnginePublishVersionTask engine_task(publish_version_req, &error_tablet_ids); + EnginePublishVersionTask engine_task(publish_version_req, &error_tablet_ids, + &succ_tablet_ids); res = _env->storage_engine()->execute_task(&engine_task); if (res.ok()) { break; @@ -717,7 +719,18 @@ void TaskWorkerPool::_publish_version_worker_thread_callback() { << ", error_code=" << res; finish_task_request.__set_error_tablet_ids(error_tablet_ids); } else { - LOG(INFO) << "publish_version success. signature:" << agent_task_req.signature; + for (int i = 0; i < succ_tablet_ids.size(); i++) { + TabletSharedPtr tablet = + StorageEngine::instance()->tablet_manager()->get_tablet(succ_tablet_ids[i]); + if (tablet != nullptr) { + StorageEngine::instance()->submit_samll_compaction_task(tablet); + LOG(INFO) << "tirgger samll compaction succ" << succ_tablet_ids[i]; + } else { + LOG(WARNING) << "tirgger samll compaction failed" << succ_tablet_ids[i]; + } + } + LOG(INFO) << "publish_version success. signature:" << agent_task_req.signature + << ", size:" << succ_tablet_ids.size(); } res.to_thrift(&finish_task_request.task_status); diff --git a/be/src/common/config.h b/be/src/common/config.h index fe40be22a8afb5..f7b9b601218afc 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -226,9 +226,6 @@ CONF_Int64(index_stream_cache_capacity, "10737418240"); // Cache for storage page size CONF_String(storage_page_cache_limit, "20%"); -// Shard size for page cache, the value must be power of two. -// It's recommended to set it to a value close to the number of BE cores in order to reduce lock contentions. -CONF_Int32(storage_page_cache_shard_size, "16"); // Percentage for index page cache // all storage page cache will be divided into data_page_cache and index_page_cache CONF_Int32(index_page_cache_percentage, "10"); @@ -294,12 +291,16 @@ CONF_mInt64(min_compaction_failure_interval_sec, "5"); // 5 seconds // This config can be set to limit thread number in compaction thread pool. CONF_mInt32(max_compaction_threads, "10"); + // This config can be set to limit thread number in convert rowset thread pool. CONF_mInt32(convert_rowset_thread_num, "0"); // initial sleep interval in seconds of scan alpha rowset CONF_mInt32(scan_alpha_rowset_min_interval_sec, "3"); +// This config can be set to limit thread number in smallcompaction thread pool. +CONF_mInt32(small_compaction_max_threads, "10"); + // Thread count to do tablet meta checkpoint, -1 means use the data directories count. CONF_Int32(max_meta_checkpoint_threads, "-1"); @@ -741,6 +742,13 @@ CONF_Int32(parquet_reader_max_buffer_size, "50"); // if it is lower than a specific threshold, the predicate will be disabled. CONF_mInt32(bloom_filter_predicate_check_row_num, "1000"); +// For continuous versions that rows less than small_compaction_max_rows will trigger compaction quickly +// if set to 0 means turn off this feature +CONF_Int32(small_compaction_max_rows, "1000"); + +// min compaction versions +CONF_Int32(small_compaction_batch_size, "20"); + } // namespace config } // namespace doris diff --git a/be/src/common/status.h b/be/src/common/status.h index cca23836706924..31a58a28df210a 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -24,7 +24,7 @@ namespace doris { #define APPLY_FOR_ERROR_CODES(M) \ M(OLAP_SUCCESS, 0, "", false) \ M(OLAP_ERR_OTHER_ERROR, -1, "", true) \ - M(OLAP_REQUEST_FAILED, -2, "", true) \ + M(OLAP_REQUEST_FAILED, -2, "", false) \ M(OLAP_ERR_OS_ERROR, -100, "", true) \ M(OLAP_ERR_DIR_NOT_EXIST, -101, "", true) \ M(OLAP_ERR_FILE_NOT_EXIST, -102, "", true) \ @@ -176,8 +176,8 @@ namespace doris { M(OLAP_ERR_HEADER_LOAD_JSON_HEADER, -1410, "", true) \ M(OLAP_ERR_HEADER_INIT_FAILED, -1411, "", true) \ M(OLAP_ERR_HEADER_PB_PARSE_FAILED, -1412, "", true) \ - M(OLAP_ERR_HEADER_HAS_PENDING_DATA, -1413, "", true) \ - M(OLAP_ERR_SCHEMA_SCHEMA_INVALID, -1500, "", true) \ + M(OLAP_ERR_HEADER_HAS_PENDING_DATA, -1413, "", false) \ + M(OLAP_ERR_SCHEMA_SCHEMA_INVALID, -1500, "", false) \ M(OLAP_ERR_SCHEMA_SCHEMA_FIELD_INVALID, -1501, "", true) \ M(OLAP_ERR_ALTER_MULTI_TABLE_ERR, -1600, "", true) \ M(OLAP_ERR_ALTER_DELTA_DOES_NOT_EXISTS, -1601, "", true) \ diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index c279ac7d62737b..1da0d080d77207 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -55,6 +55,25 @@ Status Compaction::execute_compact() { return st; } +Status Compaction::samll_rowsets_compact() { + std::unique_lock lock(_tablet->get_cumulative_compaction_lock(), std::try_to_lock); + if (!lock.owns_lock()) { + LOG(INFO) << "The tablet is under cumulative compaction. tablet=" << _tablet->full_name(); + return Status::OLAPInternalError(OLAP_ERR_CE_TRY_CE_LOCK_ERROR); + } + _input_rowsets.clear(); + int vertion_count = _tablet->version_count(); + int64_t now = UnixMillis(); + _tablet->pick_samll_verson_rowsets(&_input_rowsets); + if (_input_rowsets.size() >= 5) { + do_compaction(0); + LOG(INFO) << "samll_rowsets_compact,before_versions:" << vertion_count + << ", after_versions:" << _tablet->version_count() + << ", cost:" << (UnixMillis() - now) / 1000; + } + return Status::OK(); +} + Status Compaction::do_compaction(int64_t permits) { TRACE("start to do compaction"); _tablet->data_dir()->disks_compaction_score_increment(permits); diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h index bd37cae275a642..603bf1f40c597a 100644 --- a/be/src/olap/compaction.h +++ b/be/src/olap/compaction.h @@ -48,6 +48,7 @@ class Compaction { // This is only for http CompactionAction Status compact(); + Status samll_rowsets_compact(); virtual Status prepare_compact() = 0; Status execute_compact(); diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 132a79a390afd5..7ededcea491f7b 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -17,6 +17,8 @@ #include "olap/delta_writer.h" +#include "olap/base_compaction.h" +#include "olap/cumulative_compaction.h" #include "olap/data_dir.h" #include "olap/memtable.h" #include "olap/memtable_flush_executor.h" @@ -98,9 +100,12 @@ Status DeltaWriter::init() { MemTracker::create_tracker(-1, "DeltaWriter:" + std::to_string(_tablet->tablet_id())); // check tablet version number if (_tablet->version_count() > config::max_tablet_version_num) { - LOG(WARNING) << "failed to init delta writer. version count: " << _tablet->version_count() - << ", exceed limit: " << config::max_tablet_version_num - << ". tablet: " << _tablet->full_name(); + //trigger samll compaction + StorageEngine::instance()->submit_samll_compaction_task(_tablet); + LOG(WARNING) << "failed to init delta writer. version count: " + << _tablet->version_count() + << ", exceed limit: " << config::max_tablet_version_num + << ". tablet: " << _tablet->full_name(); return Status::OLAPInternalError(OLAP_ERR_TOO_MANY_VERSION); } diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 2ad264d2bcce0a..56319b2c6ca927 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -88,6 +88,11 @@ Status StorageEngine::start_bg_threads() { LOG(INFO) << "alpha rowset scan thread started"; } + ThreadPoolBuilder("CompactionTaskThreadPool") + .set_min_threads(max_thread_num) + .set_max_threads(max_thread_num) + .build(&_samll_compaction_thread_pool); + // compaction tasks producer thread RETURN_IF_ERROR(Thread::create( "StorageEngine", "compaction_tasks_producer_thread", @@ -661,4 +666,16 @@ Status StorageEngine::submit_compaction_task(TabletSharedPtr tablet, return _submit_compaction_task(tablet, compaction_type); } +Status StorageEngine::_handle_samll_compaction(TabletSharedPtr tablet) { + CumulativeCompaction compact(tablet); + compact.samll_rowsets_compact(); + return Status::OK(); +} + +Status StorageEngine::submit_samll_compaction_task(TabletSharedPtr tablet) { + _samll_compaction_thread_pool->submit_func( + std::bind(&StorageEngine::_handle_samll_compaction, this, tablet)); + return Status::OK(); +} + } // namespace doris diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index ce33223c5aaa1e..db20e29813bedb 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -193,6 +193,7 @@ class StorageEngine { void check_cumulative_compaction_config(); Status submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type); + Status submit_samll_compaction_task(TabletSharedPtr tablet); private: // Instance should be inited from `static open()` @@ -270,6 +271,8 @@ class StorageEngine { Status _submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type); + Status _handle_samll_compaction(TabletSharedPtr); + private: struct CompactionCandidate { CompactionCandidate(uint32_t nicumulative_compaction_, int64_t tablet_id_, uint32_t index_) @@ -378,6 +381,7 @@ class StorageEngine { HeartbeatFlags* _heartbeat_flags; std::unique_ptr _compaction_thread_pool; + std::unique_ptr _samll_compaction_thread_pool; scoped_refptr _alpha_rowset_scan_thread; std::unique_ptr _convert_rowset_thread_pool; diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index df7f02a1f971da..2695d7376e27ee 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -870,10 +870,53 @@ void Tablet::calculate_cumulative_point() { if (ret_cumulative_point == K_INVALID_CUMULATIVE_POINT) { return; } - set_cumulative_layer_point(ret_cumulative_point); } +//1.小版本rowset +//2.不改变cp,cp值的更新有cc任务来执行 +//3.找到最大的,连续小版本rowset +Status Tablet::pick_samll_verson_rowsets(std::vector* input_rowsets) { + int max_series_num = 1000; + int max_rows = config::small_version_max_rows; + if (max_rows <= 0) return Status::OK(); + std::vector> samll_version_rowsets(max_series_num); + int idx = 0; + bool is_bad = false; + if (tablet_state() == TABLET_RUNNING) { + for (auto& rs : _rs_version_map) { + bool is_delete = version_for_delete_predicate(rs.first); + if (rs.first.first > cumulative_layer_point()) { + // find samll rowset + if (!is_delete && rs.first.first > 0) { + if (rs.second->num_rows() < max_rows) { + samll_version_rowsets[idx].push_back(rs.second); + } else { + idx++; + if (idx > max_series_num) { + break; + } + } + } + } + } + int max_idx = samll_version_rowsets.size() > 0 ? 0 : -1; + std::vector result; + if (is_bad) Status::OK(); + for (int i = 0; i < samll_version_rowsets.size(); i++) { + if (samll_version_rowsets[i].size() > samll_version_rowsets[max_idx].size()) { + max_idx = i; + } + result = samll_version_rowsets[max_idx]; + } + for (int i = 0; i < result.size(); i++) { + input_rowsets->push_back(result[i]); + } + std::sort(input_rowsets->begin(), input_rowsets->end(), Rowset::comparator); + } + return Status::OK(); +} + Status Tablet::split_range(const OlapTuple& start_key_strings, const OlapTuple& end_key_strings, uint64_t request_block_row_count, std::vector* ranges) { DCHECK(ranges != nullptr); diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index a7f1830ef13885..2089df0cb1ada8 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -72,6 +72,7 @@ class Tablet : public BaseTablet { // Used in clone task, to update local meta when finishing a clone job Status revise_tablet_meta(const std::vector& rowsets_to_clone, const std::vector& versions_to_delete); + Status pick_samll_verson_rowsets(std::vector* input_rowsets); const int64_t cumulative_layer_point() const; void set_cumulative_layer_point(int64_t new_point); @@ -189,6 +190,10 @@ class Tablet : public BaseTablet { _last_cumu_compaction_success_millis = millis; } + void set_last_small_compaction_success_time(int64_t millis) { + _last_small_compaction_success_time_millis = millis; + } + int64_t last_base_compaction_success_time() { return _last_base_compaction_success_millis; } void set_last_base_compaction_success_time(int64_t millis) { _last_base_compaction_success_millis = millis; @@ -335,7 +340,7 @@ class Tablet : public BaseTablet { std::atomic _last_cumu_compaction_success_millis; // timestamp of last base compaction success std::atomic _last_base_compaction_success_millis; - + std::atomic _last_small_compaction_success_time_millis; std::atomic _cumulative_point; std::atomic _newly_created_rowset_num; std::atomic _last_checkpoint_time; diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp index de4d6449f2d549..5c7397c8ad63c1 100644 --- a/be/src/olap/task/engine_publish_version_task.cpp +++ b/be/src/olap/task/engine_publish_version_task.cpp @@ -28,8 +28,11 @@ namespace doris { using std::map; EnginePublishVersionTask::EnginePublishVersionTask(TPublishVersionRequest& publish_version_req, - std::vector* error_tablet_ids) - : _publish_version_req(publish_version_req), _error_tablet_ids(error_tablet_ids) {} + std::vector* error_tablet_ids, + std::vector* succ_tablet_ids) + : _publish_version_req(publish_version_req), + _error_tablet_ids(error_tablet_ids), + _succ_tablet_ids(succ_tablet_ids) {} Status EnginePublishVersionTask::finish() { Status res = Status::OK(); @@ -106,6 +109,9 @@ Status EnginePublishVersionTask::finish() { res = publish_status; continue; } + if (_succ_tablet_ids != nullptr) { + _succ_tablet_ids->push_back(tablet_info.tablet_id); + } partition_related_tablet_infos.erase(tablet_info); VLOG_NOTICE << "publish version successfully on tablet. tablet=" << tablet->full_name() << ", transaction_id=" << transaction_id << ", version=" << version.first diff --git a/be/src/olap/task/engine_publish_version_task.h b/be/src/olap/task/engine_publish_version_task.h index 2601ed8f60f389..4086f466d3da94 100644 --- a/be/src/olap/task/engine_publish_version_task.h +++ b/be/src/olap/task/engine_publish_version_task.h @@ -27,7 +27,8 @@ namespace doris { class EnginePublishVersionTask : public EngineTask { public: EnginePublishVersionTask(TPublishVersionRequest& publish_version_req, - vector* error_tablet_ids); + vector* error_tablet_ids, + std::vector* succ_tablet_ids = nullptr); ~EnginePublishVersionTask() {} virtual Status finish() override; @@ -35,6 +36,7 @@ class EnginePublishVersionTask : public EngineTask { private: const TPublishVersionRequest& _publish_version_req; vector* _error_tablet_ids; + vector* _succ_tablet_ids; }; } // namespace doris From a087337875c351ad57febb76698d3ab74406189b Mon Sep 17 00:00:00 2001 From: chenlinzhong <490103404@qq.com> Date: Thu, 26 May 2022 22:13:22 +0800 Subject: [PATCH 02/11] [Feature] compaction quickly for small data import #9791 --- be/src/exprs/table_function/explode_json_array.h | 7 ++++--- be/src/olap/delta_writer.cpp | 7 +++---- be/src/olap/rowset/segment_v2/binary_plain_page.h | 3 +-- be/src/vec/common/cow.h | 4 ++-- be/test/util/threadpool_test.cpp | 2 +- 5 files changed, 11 insertions(+), 12 deletions(-) diff --git a/be/src/exprs/table_function/explode_json_array.h b/be/src/exprs/table_function/explode_json_array.h index 616535ea1282e3..d98c56d929b2b6 100644 --- a/be/src/exprs/table_function/explode_json_array.h +++ b/be/src/exprs/table_function/explode_json_array.h @@ -87,9 +87,10 @@ struct ParsedData { *output = _data[offset]; break; case ExplodeJsonArrayType::STRING: - *output = _string_nulls[offset] ? nullptr - : real ? reinterpret_cast(_backup_string[offset].data()) - : &_data_string[offset]; + *output = _string_nulls[offset] + ? nullptr + : real ? reinterpret_cast(_backup_string[offset].data()) + : &_data_string[offset]; break; default: CHECK(false) << type; diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 7ededcea491f7b..c409012af60136 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -102,10 +102,9 @@ Status DeltaWriter::init() { if (_tablet->version_count() > config::max_tablet_version_num) { //trigger samll compaction StorageEngine::instance()->submit_samll_compaction_task(_tablet); - LOG(WARNING) << "failed to init delta writer. version count: " - << _tablet->version_count() - << ", exceed limit: " << config::max_tablet_version_num - << ". tablet: " << _tablet->full_name(); + LOG(WARNING) << "failed to init delta writer. version count: " << _tablet->version_count() + << ", exceed limit: " << config::max_tablet_version_num + << ". tablet: " << _tablet->full_name(); return Status::OLAPInternalError(OLAP_ERR_TOO_MANY_VERSION); } diff --git a/be/src/olap/rowset/segment_v2/binary_plain_page.h b/be/src/olap/rowset/segment_v2/binary_plain_page.h index 58dad51b90cbe1..e8c8f2a18edaec 100644 --- a/be/src/olap/rowset/segment_v2/binary_plain_page.h +++ b/be/src/olap/rowset/segment_v2/binary_plain_page.h @@ -271,8 +271,7 @@ class BinaryPlainPageDecoder : public PageDecoder { } void get_dict_word_info(StringRef* dict_word_info) { - if (_num_elems <= 0) [[unlikely]] - return; + if (_num_elems <= 0) [[unlikely]] return; char* data_begin = (char*)&_data[0]; char* offset_ptr = (char*)&_data[_offsets_pos]; diff --git a/be/src/vec/common/cow.h b/be/src/vec/common/cow.h index 27f3e864a69a50..3cefc293b2b6a2 100644 --- a/be/src/vec/common/cow.h +++ b/be/src/vec/common/cow.h @@ -350,8 +350,8 @@ class COW { const T& operator*() const { return *value; } T& operator*() { return value->assume_mutable_ref(); } - operator const immutable_ptr&() const { return value; } - operator immutable_ptr&() { return value; } + operator const immutable_ptr &() const { return value; } + operator immutable_ptr &() { return value; } operator bool() const { return value != nullptr; } bool operator!() const { return value == nullptr; } diff --git a/be/test/util/threadpool_test.cpp b/be/test/util/threadpool_test.cpp index eceda73f552b51..33c9fe3817d769 100644 --- a/be/test/util/threadpool_test.cpp +++ b/be/test/util/threadpool_test.cpp @@ -236,7 +236,7 @@ TEST_F(ThreadPoolTest, TestRace) { // so an cast is needed to use std::bind EXPECT_TRUE(_pool ->submit_func(std::bind( - (void(CountDownLatch::*)())(&CountDownLatch::count_down), &l)) + (void (CountDownLatch::*)())(&CountDownLatch::count_down), &l)) .ok()); l.wait(); // Sleeping a different amount in each iteration makes it more likely to hit From 13e54c623c2c85a6ebc94a01b56c193ba603fff9 Mon Sep 17 00:00:00 2001 From: chenlinzhong <490103404@qq.com> Date: Tue, 31 May 2022 21:41:41 +0800 Subject: [PATCH 03/11] compaction quickly for small data import #9791 1.merge small versions of rowset as soon as possible to increase the import frequency of small version data 2.small version means that the number of rows is less than config::small_compaction_rowset_rows default 1000 --- be/src/agent/task_worker_pool.cpp | 9 +++++++-- be/src/common/config.h | 4 ++-- .../exprs/table_function/explode_json_array.h | 7 +++---- be/src/olap/compaction.cpp | 12 ++++++------ be/src/olap/compaction.h | 2 +- be/src/olap/delta_writer.cpp | 2 +- be/src/olap/olap_server.cpp | 6 ++++++ .../rowset/segment_v2/binary_plain_page.h | 3 ++- be/src/olap/storage_engine.h | 2 +- be/src/olap/tablet.cpp | 19 +++++++------------ be/src/olap/tablet.h | 2 +- be/src/vec/common/cow.h | 4 ++-- be/test/util/threadpool_test.cpp | 2 +- 13 files changed, 40 insertions(+), 34 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index e1b249dcacfd92..63ef6716f55ecf 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -719,18 +719,23 @@ void TaskWorkerPool::_publish_version_worker_thread_callback() { << ", error_code=" << res; finish_task_request.__set_error_tablet_ids(error_tablet_ids); } else { + int submit_tablets = 0; for (int i = 0; i < succ_tablet_ids.size(); i++) { TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet(succ_tablet_ids[i]); + if (tablet->version_count() < config::small_compaction_batch_size) { + continue; + } if (tablet != nullptr) { - StorageEngine::instance()->submit_samll_compaction_task(tablet); + submit_tablets++; + StorageEngine::instance()->submit_small_compaction_task(tablet); LOG(INFO) << "tirgger samll compaction succ" << succ_tablet_ids[i]; } else { LOG(WARNING) << "tirgger samll compaction failed" << succ_tablet_ids[i]; } } LOG(INFO) << "publish_version success. signature:" << agent_task_req.signature - << ", size:" << succ_tablet_ids.size(); + << ", size:" << submit_tablets; } res.to_thrift(&finish_task_request.task_status); diff --git a/be/src/common/config.h b/be/src/common/config.h index f7b9b601218afc..3585644ea836fa 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -742,9 +742,9 @@ CONF_Int32(parquet_reader_max_buffer_size, "50"); // if it is lower than a specific threshold, the predicate will be disabled. CONF_mInt32(bloom_filter_predicate_check_row_num, "1000"); -// For continuous versions that rows less than small_compaction_max_rows will trigger compaction quickly +// For continuous versions that rows less than small_compaction_rowset_rows will trigger compaction quickly // if set to 0 means turn off this feature -CONF_Int32(small_compaction_max_rows, "1000"); +CONF_Int32(small_compaction_rowset_rows, "1000"); // min compaction versions CONF_Int32(small_compaction_batch_size, "20"); diff --git a/be/src/exprs/table_function/explode_json_array.h b/be/src/exprs/table_function/explode_json_array.h index d98c56d929b2b6..616535ea1282e3 100644 --- a/be/src/exprs/table_function/explode_json_array.h +++ b/be/src/exprs/table_function/explode_json_array.h @@ -87,10 +87,9 @@ struct ParsedData { *output = _data[offset]; break; case ExplodeJsonArrayType::STRING: - *output = _string_nulls[offset] - ? nullptr - : real ? reinterpret_cast(_backup_string[offset].data()) - : &_data_string[offset]; + *output = _string_nulls[offset] ? nullptr + : real ? reinterpret_cast(_backup_string[offset].data()) + : &_data_string[offset]; break; default: CHECK(false) << type; diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 1da0d080d77207..1e3ad9cfcc8233 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -55,21 +55,21 @@ Status Compaction::execute_compact() { return st; } -Status Compaction::samll_rowsets_compact() { +Status Compaction::small_rowsets_compact() { std::unique_lock lock(_tablet->get_cumulative_compaction_lock(), std::try_to_lock); if (!lock.owns_lock()) { LOG(INFO) << "The tablet is under cumulative compaction. tablet=" << _tablet->full_name(); return Status::OLAPInternalError(OLAP_ERR_CE_TRY_CE_LOCK_ERROR); } _input_rowsets.clear(); - int vertion_count = _tablet->version_count(); + int version_count = _tablet->version_count(); int64_t now = UnixMillis(); - _tablet->pick_samll_verson_rowsets(&_input_rowsets); - if (_input_rowsets.size() >= 5) { + _tablet->pick_small_verson_rowsets(&_input_rowsets); + if (_input_rowsets.size() >= config::small_compaction_batch_size) { do_compaction(0); - LOG(INFO) << "samll_rowsets_compact,before_versions:" << vertion_count + LOG(INFO) << "small_rowsets_compaction, before_versions:" << version_count << ", after_versions:" << _tablet->version_count() - << ", cost:" << (UnixMillis() - now) / 1000; + << ", cost:" << (UnixMillis() - now) << "ms"; } return Status::OK(); } diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h index 603bf1f40c597a..3a26e0544b0bb0 100644 --- a/be/src/olap/compaction.h +++ b/be/src/olap/compaction.h @@ -48,7 +48,7 @@ class Compaction { // This is only for http CompactionAction Status compact(); - Status samll_rowsets_compact(); + Status small_rowsets_compact(); virtual Status prepare_compact() = 0; Status execute_compact(); diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index c409012af60136..7221ee4e8b378a 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -101,7 +101,7 @@ Status DeltaWriter::init() { // check tablet version number if (_tablet->version_count() > config::max_tablet_version_num) { //trigger samll compaction - StorageEngine::instance()->submit_samll_compaction_task(_tablet); + StorageEngine::instance()->submit_small_compaction_task(_tablet); LOG(WARNING) << "failed to init delta writer. version count: " << _tablet->version_count() << ", exceed limit: " << config::max_tablet_version_num << ". tablet: " << _tablet->full_name(); diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 56319b2c6ca927..d7486669dc5015 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -73,6 +73,7 @@ Status StorageEngine::start_bg_threads() { .set_max_threads(max_thread_num) .build(&_compaction_thread_pool); + int32_t convert_rowset_thread_num = config::convert_rowset_thread_num; if (convert_rowset_thread_num > 0) { ThreadPoolBuilder("ConvertRowsetTaskThreadPool") @@ -91,6 +92,11 @@ Status StorageEngine::start_bg_threads() { ThreadPoolBuilder("CompactionTaskThreadPool") .set_min_threads(max_thread_num) .set_max_threads(max_thread_num) + .build(&_compaction_thread_pool); + + ThreadPoolBuilder("SmallCompactionTaskThreadPool") + .set_min_threads(config::small_compaction_max_threads) + .set_max_threads(config::small_compaction_max_threads) .build(&_samll_compaction_thread_pool); // compaction tasks producer thread diff --git a/be/src/olap/rowset/segment_v2/binary_plain_page.h b/be/src/olap/rowset/segment_v2/binary_plain_page.h index e8c8f2a18edaec..58dad51b90cbe1 100644 --- a/be/src/olap/rowset/segment_v2/binary_plain_page.h +++ b/be/src/olap/rowset/segment_v2/binary_plain_page.h @@ -271,7 +271,8 @@ class BinaryPlainPageDecoder : public PageDecoder { } void get_dict_word_info(StringRef* dict_word_info) { - if (_num_elems <= 0) [[unlikely]] return; + if (_num_elems <= 0) [[unlikely]] + return; char* data_begin = (char*)&_data[0]; char* offset_ptr = (char*)&_data[_offsets_pos]; diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index db20e29813bedb..8478a2e9f3cac0 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -193,7 +193,7 @@ class StorageEngine { void check_cumulative_compaction_config(); Status submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type); - Status submit_samll_compaction_task(TabletSharedPtr tablet); + Status submit_small_compaction_task(TabletSharedPtr tablet); private: // Instance should be inited from `static open()` diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 2695d7376e27ee..dea8786f0ffd8d 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -873,16 +873,13 @@ void Tablet::calculate_cumulative_point() { set_cumulative_layer_point(ret_cumulative_point); } -//1.小版本rowset -//2.不改变cp,cp值的更新有cc任务来执行 -//3.找到最大的,连续小版本rowset -Status Tablet::pick_samll_verson_rowsets(std::vector* input_rowsets) { +//find rowsets that rows less then "config::small_version_max_rows" +Status Tablet::pick_small_verson_rowsets(std::vector* input_rowsets) { int max_series_num = 1000; - int max_rows = config::small_version_max_rows; + int max_rows = config::small_compaction_rowset_rows; if (max_rows <= 0) return Status::OK(); std::vector> samll_version_rowsets(max_series_num); int idx = 0; - bool is_bad = false; if (tablet_state() == TABLET_RUNNING) { for (auto& rs : _rs_version_map) { bool is_delete = version_for_delete_predicate(rs.first); @@ -900,14 +897,12 @@ Status Tablet::pick_samll_verson_rowsets(std::vector* input_row } } } - int max_idx = samll_version_rowsets.size() > 0 ? 0 : -1; - std::vector result; - if (is_bad) Status::OK(); + if (samll_version_rowsets.size() == 0) return Status::OK(); + std::vector result = samll_version_rowsets[0]; for (int i = 0; i < samll_version_rowsets.size(); i++) { - if (samll_version_rowsets[i].size() > samll_version_rowsets[max_idx].size()) { - max_idx = i; + if (samll_version_rowsets[i].size() > result.size()) { + result = samll_version_rowsets[i]; } - result = samll_version_rowsets[max_idx]; } for (int i = 0; i < result.size(); i++) { input_rowsets->push_back(result[i]); diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 2089df0cb1ada8..6a072ea120166f 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -72,7 +72,7 @@ class Tablet : public BaseTablet { // Used in clone task, to update local meta when finishing a clone job Status revise_tablet_meta(const std::vector& rowsets_to_clone, const std::vector& versions_to_delete); - Status pick_samll_verson_rowsets(std::vector* input_rowsets); + Status pick_small_verson_rowsets(std::vector* input_rowsets); const int64_t cumulative_layer_point() const; void set_cumulative_layer_point(int64_t new_point); diff --git a/be/src/vec/common/cow.h b/be/src/vec/common/cow.h index 3cefc293b2b6a2..27f3e864a69a50 100644 --- a/be/src/vec/common/cow.h +++ b/be/src/vec/common/cow.h @@ -350,8 +350,8 @@ class COW { const T& operator*() const { return *value; } T& operator*() { return value->assume_mutable_ref(); } - operator const immutable_ptr &() const { return value; } - operator immutable_ptr &() { return value; } + operator const immutable_ptr&() const { return value; } + operator immutable_ptr&() { return value; } operator bool() const { return value != nullptr; } bool operator!() const { return value == nullptr; } diff --git a/be/test/util/threadpool_test.cpp b/be/test/util/threadpool_test.cpp index 33c9fe3817d769..eceda73f552b51 100644 --- a/be/test/util/threadpool_test.cpp +++ b/be/test/util/threadpool_test.cpp @@ -236,7 +236,7 @@ TEST_F(ThreadPoolTest, TestRace) { // so an cast is needed to use std::bind EXPECT_TRUE(_pool ->submit_func(std::bind( - (void (CountDownLatch::*)())(&CountDownLatch::count_down), &l)) + (void(CountDownLatch::*)())(&CountDownLatch::count_down), &l)) .ok()); l.wait(); // Sleeping a different amount in each iteration makes it more likely to hit From 0d9bcda20939434631eda86d7d406cc07f5368e6 Mon Sep 17 00:00:00 2001 From: chenlinzhong <490103404@qq.com> Date: Tue, 31 May 2022 22:18:31 +0800 Subject: [PATCH 04/11] compaction quickly for small data import #9791 1.merge small versions of rowset as soon as possible to increase the import frequency of small version data 2.small version means that the number of rows is less than config::small_compaction_rowset_rows default 1000 --- be/src/common/config.h | 1 + be/src/olap/tablet.cpp | 21 +++++++++------------ 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 3585644ea836fa..a150feffc7a0d2 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -738,6 +738,7 @@ CONF_Int32(object_pool_buffer_size, "100"); // ParquetReaderWrap prefetch buffer size CONF_Int32(parquet_reader_max_buffer_size, "50"); + // When the rows number reached this limit, will check the filter rate the of bloomfilter // if it is lower than a specific threshold, the predicate will be disabled. CONF_mInt32(bloom_filter_predicate_check_row_num, "1000"); diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index dea8786f0ffd8d..50ba209999f902 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -873,26 +873,23 @@ void Tablet::calculate_cumulative_point() { set_cumulative_layer_point(ret_cumulative_point); } -//find rowsets that rows less then "config::small_version_max_rows" +//find rowsets that rows less then "config::small_compaction_max_rows" Status Tablet::pick_small_verson_rowsets(std::vector* input_rowsets) { int max_series_num = 1000; - int max_rows = config::small_compaction_rowset_rows; + int max_rows = config::small_compaction_max_rows; if (max_rows <= 0) return Status::OK(); std::vector> samll_version_rowsets(max_series_num); int idx = 0; if (tablet_state() == TABLET_RUNNING) { for (auto& rs : _rs_version_map) { bool is_delete = version_for_delete_predicate(rs.first); - if (rs.first.first > cumulative_layer_point()) { - // find samll rowset - if (!is_delete && rs.first.first > 0) { - if (rs.second->num_rows() < max_rows) { - samll_version_rowsets[idx].push_back(rs.second); - } else { - idx++; - if (idx > max_series_num) { - break; - } + if (!is_delete && rs.first.first > 0 && rs.first.first > cumulative_layer_point()) { + if (rs.second->num_rows() < max_rows) { + samll_version_rowsets[idx].push_back(rs.second); + } else { + idx++; + if (idx > max_series_num) { + break; } } } From 0cc87e746844168124f7d3f9b06f6fd7bdd53a86 Mon Sep 17 00:00:00 2001 From: chenlinzhong <490103404@qq.com> Date: Tue, 31 May 2022 22:55:46 +0800 Subject: [PATCH 05/11] compaction quickly for small data import #9791 1.merge small versions of rowset as soon as possible to increase the import frequency of small version data 2.small version means that the number of rows is less than config::small_compaction_rowset_rows default 1000 --- be/src/common/config.h | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index a150feffc7a0d2..0687a5c2c3233b 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -226,6 +226,9 @@ CONF_Int64(index_stream_cache_capacity, "10737418240"); // Cache for storage page size CONF_String(storage_page_cache_limit, "20%"); +// Shard size for page cache, the value must be power of two. +// It's recommended to set it to a value close to the number of BE cores in order to reduce lock contentions. +CONF_Int32(storage_page_cache_shard_size, "16"); // Percentage for index page cache // all storage page cache will be divided into data_page_cache and index_page_cache CONF_Int32(index_page_cache_percentage, "10"); @@ -743,9 +746,9 @@ CONF_Int32(parquet_reader_max_buffer_size, "50"); // if it is lower than a specific threshold, the predicate will be disabled. CONF_mInt32(bloom_filter_predicate_check_row_num, "1000"); -// For continuous versions that rows less than small_compaction_rowset_rows will trigger compaction quickly +// For continuous versions that rows less than small_compaction_max_rows will trigger compaction quickly // if set to 0 means turn off this feature -CONF_Int32(small_compaction_rowset_rows, "1000"); +CONF_Int32(small_compaction_max_rows, "1000"); // min compaction versions CONF_Int32(small_compaction_batch_size, "20"); From 771324edd1ce3ebf1d350aebdacaf3953b88cc71 Mon Sep 17 00:00:00 2001 From: chenlinzhong <490103404@qq.com> Date: Tue, 31 May 2022 22:59:39 +0800 Subject: [PATCH 06/11] compaction quickly for small data import #9791 1.merge small versions of rowset as soon as possible to increase the import frequency of small version data 2.small version means that the number of rows is less than config::small_compaction_rowset_rows default 1000 --- be/src/common/config.h | 1 - 1 file changed, 1 deletion(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 0687a5c2c3233b..623a51328b0a19 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -741,7 +741,6 @@ CONF_Int32(object_pool_buffer_size, "100"); // ParquetReaderWrap prefetch buffer size CONF_Int32(parquet_reader_max_buffer_size, "50"); - // When the rows number reached this limit, will check the filter rate the of bloomfilter // if it is lower than a specific threshold, the predicate will be disabled. CONF_mInt32(bloom_filter_predicate_check_row_num, "1000"); From b6aa87c5d9308dc6462cedc18117a95cab70d704 Mon Sep 17 00:00:00 2001 From: chenlinzhong <490103404@qq.com> Date: Wed, 1 Jun 2022 15:23:48 +0800 Subject: [PATCH 07/11] compaction quickly for small data import #9791 1.merge small versions of rowset as soon as possible to increase the import frequency of small version data 2.small version means that the number of rows is less than config::small_compaction_rowset_rows default 1000 --- be/src/common/config.h | 4 ++-- be/src/common/status.h | 2 +- be/src/olap/compaction.cpp | 7 +++++-- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 623a51328b0a19..10c9928e2c59ca 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -747,10 +747,10 @@ CONF_mInt32(bloom_filter_predicate_check_row_num, "1000"); // For continuous versions that rows less than small_compaction_max_rows will trigger compaction quickly // if set to 0 means turn off this feature -CONF_Int32(small_compaction_max_rows, "1000"); +CONF_Int32(small_compaction_max_rows, "10000"); // min compaction versions -CONF_Int32(small_compaction_batch_size, "20"); +CONF_Int32(small_compaction_batch_size, "10"); } // namespace config diff --git a/be/src/common/status.h b/be/src/common/status.h index 31a58a28df210a..58fdb7d2f453cf 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -92,7 +92,7 @@ namespace doris { M(OLAP_ERR_CE_LOAD_TABLE_ERROR, -303, "", true) \ M(OLAP_ERR_CE_NOT_FINISHED, -304, "", true) \ M(OLAP_ERR_CE_TABLET_ID_EXIST, -305, "", true) \ - M(OLAP_ERR_CE_TRY_CE_LOCK_ERROR, -306, "", true) \ + M(OLAP_ERR_CE_TRY_CE_LOCK_ERROR, -306, "", false) \ M(OLAP_ERR_TABLE_VERSION_DUPLICATE_ERROR, -400, "", true) \ M(OLAP_ERR_TABLE_VERSION_INDEX_MISMATCH_ERROR, -401, "", true) \ M(OLAP_ERR_TABLE_INDEX_VALIDATE_ERROR, -402, "", true) \ diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 1e3ad9cfcc8233..fc473fb405a182 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -58,18 +58,21 @@ Status Compaction::execute_compact() { Status Compaction::small_rowsets_compact() { std::unique_lock lock(_tablet->get_cumulative_compaction_lock(), std::try_to_lock); if (!lock.owns_lock()) { - LOG(INFO) << "The tablet is under cumulative compaction. tablet=" << _tablet->full_name(); + LOG(WARNING) << "The tablet is under cumulative compaction. tablet=" + << _tablet->full_name(); return Status::OLAPInternalError(OLAP_ERR_CE_TRY_CE_LOCK_ERROR); } _input_rowsets.clear(); int version_count = _tablet->version_count(); int64_t now = UnixMillis(); _tablet->pick_small_verson_rowsets(&_input_rowsets); + int nums = _input_rowsets.size(); if (_input_rowsets.size() >= config::small_compaction_batch_size) { do_compaction(0); LOG(INFO) << "small_rowsets_compaction, before_versions:" << version_count << ", after_versions:" << _tablet->version_count() - << ", cost:" << (UnixMillis() - now) << "ms"; + << ", cost:" << (UnixMillis() - now) << "ms" + << ", merged: " << nums << ", batch:" << config::small_compaction_batch_size; } return Status::OK(); } From c862940efbd7b0653da3b41a633e16cd38fafdde Mon Sep 17 00:00:00 2001 From: chenlinzhong <490103404@qq.com> Date: Mon, 6 Jun 2022 21:58:43 +0800 Subject: [PATCH 08/11] compaction quickly for small data import #9791 1.merge small versions of rowset as soon as possible to increase the import frequency of small version data 2.small version means that the number of rows is less than config::small_compaction_max_rows default 10000 --- be/src/agent/task_worker_pool.cpp | 36 ++++++++++++--------- be/src/common/config.h | 2 ++ be/src/olap/compaction.cpp | 54 +++++++++++++++++++++++++++---- be/src/olap/tablet.cpp | 25 ++++++++++---- be/src/olap/tablet.h | 3 +- 5 files changed, 90 insertions(+), 30 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 63ef6716f55ecf..944672442ec0c0 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -719,23 +719,29 @@ void TaskWorkerPool::_publish_version_worker_thread_callback() { << ", error_code=" << res; finish_task_request.__set_error_tablet_ids(error_tablet_ids); } else { - int submit_tablets = 0; - for (int i = 0; i < succ_tablet_ids.size(); i++) { - TabletSharedPtr tablet = - StorageEngine::instance()->tablet_manager()->get_tablet(succ_tablet_ids[i]); - if (tablet->version_count() < config::small_compaction_batch_size) { - continue; - } - if (tablet != nullptr) { - submit_tablets++; - StorageEngine::instance()->submit_small_compaction_task(tablet); - LOG(INFO) << "tirgger samll compaction succ" << succ_tablet_ids[i]; - } else { - LOG(WARNING) << "tirgger samll compaction failed" << succ_tablet_ids[i]; + if (config::small_compaction_batch_size > 0) { + int submit_tablets = 0; + for (int i = 0; i < succ_tablet_ids.size(); i++) { + TabletSharedPtr tablet = + StorageEngine::instance()->tablet_manager()->get_tablet( + succ_tablet_ids[i]); + if (tablet != nullptr) { + submit_tablets++; + tablet->publised_count++; + if (tablet->publised_count % config::small_compaction_batch_size == 0) { + StorageEngine::instance()->submit_small_compaction_task(tablet); + LOG(INFO) << "trigger small compaction succ, tabletid:" + << succ_tablet_ids[i] + << ", publised:" << tablet->publised_count; + } + } else { + LOG(WARNING) << "trigger samll compaction failed, tabletid:" + << succ_tablet_ids[i]; + } } + LOG(INFO) << "publish_version success. signature:" << agent_task_req.signature + << ", size:" << submit_tablets; } - LOG(INFO) << "publish_version success. signature:" << agent_task_req.signature - << ", size:" << submit_tablets; } res.to_thrift(&finish_task_request.task_status); diff --git a/be/src/common/config.h b/be/src/common/config.h index 10c9928e2c59ca..ae828d5fdd289a 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -751,6 +751,8 @@ CONF_Int32(small_compaction_max_rows, "10000"); // min compaction versions CONF_Int32(small_compaction_batch_size, "10"); +// do compaction min rowsets +CONF_Int32(small_compaction_min_rowsets, "10"); } // namespace config diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index fc473fb405a182..e4d87cbb712a84 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -62,17 +62,57 @@ Status Compaction::small_rowsets_compact() { << _tablet->full_name(); return Status::OLAPInternalError(OLAP_ERR_CE_TRY_CE_LOCK_ERROR); } + + // Clone task may happen after compaction task is submitted to thread pool, and rowsets picked + // for compaction may change. In this case, current compaction task should not be executed. + if (_tablet->get_clone_occurred()) { + _tablet->set_clone_occurred(false); + return Status::OLAPInternalError(OLAP_ERR_CUMULATIVE_CLONE_OCCURRED); + } + _input_rowsets.clear(); int version_count = _tablet->version_count(); int64_t now = UnixMillis(); - _tablet->pick_small_verson_rowsets(&_input_rowsets); + int64_t permits = 0; + _tablet->pick_small_verson_rowsets(&_input_rowsets, &permits); + std::string input_ver = ""; + for (int i = 0; i < _input_rowsets.size(); i++) { + input_ver.append("[" + std::to_string(_input_rowsets[i]->start_version()) + ","); + input_ver.append(std::to_string(_input_rowsets[i]->start_version()) + "]"); + } + + std::vector missedVersions; + find_longest_consecutive_version(&_input_rowsets, &missedVersions); + if (missedVersions.size() != 0) { + std::string logstr = ""; + for (int i = 0; i < missedVersions.size(); i++) { + logstr.append("[" + std::to_string(missedVersions[i].first)); + logstr.append("," + std::to_string(missedVersions[i].second)); + logstr.append("]"); + } + LOG(WARNING) << "small_rowsets_compaction, find missed version" << logstr + << ",input_size:" << _input_rowsets.size() << "version:" << input_ver; + } int nums = _input_rowsets.size(); - if (_input_rowsets.size() >= config::small_compaction_batch_size) { - do_compaction(0); - LOG(INFO) << "small_rowsets_compaction, before_versions:" << version_count - << ", after_versions:" << _tablet->version_count() - << ", cost:" << (UnixMillis() - now) << "ms" - << ", merged: " << nums << ", batch:" << config::small_compaction_batch_size; + if (_input_rowsets.size() >= config::small_compaction_min_rowsets) { + Status st = check_version_continuity(_input_rowsets); + if (!st.ok()) { + LOG(WARNING) << "small_rowsets_compaction failed, cause version not continuous"; + return st; + } + st = do_compaction(permits); + if (!st.ok()) { + gc_output_rowset(); + LOG(WARNING) << "small_rowsets_compaction failed"; + } else { + LOG(INFO) << "small_rowsets_compaction succ" + << ", before_versions:" << version_count + << ", after_versions:" << _tablet->version_count() + << ", cost:" << (UnixMillis() - now) << "ms" + << ", merged: " << nums << ", batch:" << config::small_compaction_batch_size + << ", segments:" << permits << ", tabletid:" << _tablet->tablet_id(); + _tablet->set_last_small_compaction_success_time(UnixMillis()); + } } return Status::OK(); } diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 50ba209999f902..7d4992ae521e37 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -874,18 +874,29 @@ void Tablet::calculate_cumulative_point() { } //find rowsets that rows less then "config::small_compaction_max_rows" -Status Tablet::pick_small_verson_rowsets(std::vector* input_rowsets) { +Status Tablet::pick_small_verson_rowsets(std::vector* input_rowsets, + int64_t* permits) { + if (!init_succeeded()) { + return Status::OLAPInternalError(OLAP_ERR_CUMULATIVE_INVALID_PARAMETERS); + } int max_series_num = 1000; int max_rows = config::small_compaction_max_rows; if (max_rows <= 0) return Status::OK(); std::vector> samll_version_rowsets(max_series_num); int idx = 0; + std::shared_lock rdlock(_meta_lock); + std::vector sortedRowset; + for (auto& rs : _rs_version_map) { + sortedRowset.push_back(rs.second); + } + std::sort(sortedRowset.begin(), sortedRowset.end(), Rowset::comparator); if (tablet_state() == TABLET_RUNNING) { - for (auto& rs : _rs_version_map) { - bool is_delete = version_for_delete_predicate(rs.first); - if (!is_delete && rs.first.first > 0 && rs.first.first > cumulative_layer_point()) { - if (rs.second->num_rows() < max_rows) { - samll_version_rowsets[idx].push_back(rs.second); + for (int i = 0; i < sortedRowset.size(); i++) { + bool is_delete = version_for_delete_predicate(sortedRowset[i]->version()); + if (!is_delete && sortedRowset[i]->start_version() > 0 && + sortedRowset[i]->start_version() > cumulative_layer_point()) { + if (sortedRowset[i]->num_rows() < max_rows) { + samll_version_rowsets[idx].push_back(sortedRowset[i]); } else { idx++; if (idx > max_series_num) { @@ -902,9 +913,9 @@ Status Tablet::pick_small_verson_rowsets(std::vector* input_row } } for (int i = 0; i < result.size(); i++) { + *permits += result[i]->num_segments(); input_rowsets->push_back(result[i]); } - std::sort(input_rowsets->begin(), input_rowsets->end(), Rowset::comparator); } return Status::OK(); } diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 6a072ea120166f..ab6203dbedfa58 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -72,7 +72,7 @@ class Tablet : public BaseTablet { // Used in clone task, to update local meta when finishing a clone job Status revise_tablet_meta(const std::vector& rowsets_to_clone, const std::vector& versions_to_delete); - Status pick_small_verson_rowsets(std::vector* input_rowsets); + Status pick_small_verson_rowsets(std::vector* input_rowsets, int64_t* permits); const int64_t cumulative_layer_point() const; void set_cumulative_layer_point(int64_t new_point); @@ -367,6 +367,7 @@ class Tablet : public BaseTablet { public: IntCounter* flush_bytes; IntCounter* flush_count; + std::atomic publised_count = 0; }; inline CumulativeCompactionPolicy* Tablet::cumulative_compaction_policy() { From ada67cdcad695131ebeb580e450a02aa37daafb9 Mon Sep 17 00:00:00 2001 From: chenlinzhong <490103404@qq.com> Date: Tue, 7 Jun 2022 15:26:19 +0800 Subject: [PATCH 09/11] compaction quickly for small data import #9791 1.merge small versions of rowset as soon as possible to increase the import frequency of small version data 2.small version means that the number of rows is less than config::small_compaction_max_rows default 10000 --- be/src/agent/task_worker_pool.cpp | 2 +- be/src/olap/delta_writer.cpp | 2 +- be/src/olap/olap_server.cpp | 14 +++++++------- be/src/olap/storage_engine.h | 4 ++-- be/src/olap/tablet.cpp | 14 +++++++------- 5 files changed, 18 insertions(+), 18 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 944672442ec0c0..573f1ea293819a 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -735,7 +735,7 @@ void TaskWorkerPool::_publish_version_worker_thread_callback() { << ", publised:" << tablet->publised_count; } } else { - LOG(WARNING) << "trigger samll compaction failed, tabletid:" + LOG(WARNING) << "trigger small compaction failed, tabletid:" << succ_tablet_ids[i]; } } diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 7221ee4e8b378a..a45e955d8916eb 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -100,7 +100,7 @@ Status DeltaWriter::init() { MemTracker::create_tracker(-1, "DeltaWriter:" + std::to_string(_tablet->tablet_id())); // check tablet version number if (_tablet->version_count() > config::max_tablet_version_num) { - //trigger samll compaction + //trigger small compaction StorageEngine::instance()->submit_small_compaction_task(_tablet); LOG(WARNING) << "failed to init delta writer. version count: " << _tablet->version_count() << ", exceed limit: " << config::max_tablet_version_num diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index d7486669dc5015..76c5daff4b5b03 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -93,11 +93,11 @@ Status StorageEngine::start_bg_threads() { .set_min_threads(max_thread_num) .set_max_threads(max_thread_num) .build(&_compaction_thread_pool); - + ThreadPoolBuilder("SmallCompactionTaskThreadPool") .set_min_threads(config::small_compaction_max_threads) .set_max_threads(config::small_compaction_max_threads) - .build(&_samll_compaction_thread_pool); + .build(&_small_compaction_thread_pool); // compaction tasks producer thread RETURN_IF_ERROR(Thread::create( @@ -672,15 +672,15 @@ Status StorageEngine::submit_compaction_task(TabletSharedPtr tablet, return _submit_compaction_task(tablet, compaction_type); } -Status StorageEngine::_handle_samll_compaction(TabletSharedPtr tablet) { +Status StorageEngine::_handle_small_compaction(TabletSharedPtr tablet) { CumulativeCompaction compact(tablet); - compact.samll_rowsets_compact(); + compact.small_rowsets_compact(); return Status::OK(); } -Status StorageEngine::submit_samll_compaction_task(TabletSharedPtr tablet) { - _samll_compaction_thread_pool->submit_func( - std::bind(&StorageEngine::_handle_samll_compaction, this, tablet)); +Status StorageEngine::submit_small_compaction_task(TabletSharedPtr tablet) { + _small_compaction_thread_pool->submit_func( + std::bind(&StorageEngine::_handle_small_compaction, this, tablet)); return Status::OK(); } diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 8478a2e9f3cac0..2ddf49689f61f0 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -271,7 +271,7 @@ class StorageEngine { Status _submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type); - Status _handle_samll_compaction(TabletSharedPtr); + Status _handle_small_compaction(TabletSharedPtr); private: struct CompactionCandidate { @@ -381,7 +381,7 @@ class StorageEngine { HeartbeatFlags* _heartbeat_flags; std::unique_ptr _compaction_thread_pool; - std::unique_ptr _samll_compaction_thread_pool; + std::unique_ptr _small_compaction_thread_pool; scoped_refptr _alpha_rowset_scan_thread; std::unique_ptr _convert_rowset_thread_pool; diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 7d4992ae521e37..f6751d8ad0da3b 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -882,7 +882,7 @@ Status Tablet::pick_small_verson_rowsets(std::vector* input_row int max_series_num = 1000; int max_rows = config::small_compaction_max_rows; if (max_rows <= 0) return Status::OK(); - std::vector> samll_version_rowsets(max_series_num); + std::vector> small_version_rowsets(max_series_num); int idx = 0; std::shared_lock rdlock(_meta_lock); std::vector sortedRowset; @@ -896,7 +896,7 @@ Status Tablet::pick_small_verson_rowsets(std::vector* input_row if (!is_delete && sortedRowset[i]->start_version() > 0 && sortedRowset[i]->start_version() > cumulative_layer_point()) { if (sortedRowset[i]->num_rows() < max_rows) { - samll_version_rowsets[idx].push_back(sortedRowset[i]); + small_version_rowsets[idx].push_back(sortedRowset[i]); } else { idx++; if (idx > max_series_num) { @@ -905,11 +905,11 @@ Status Tablet::pick_small_verson_rowsets(std::vector* input_row } } } - if (samll_version_rowsets.size() == 0) return Status::OK(); - std::vector result = samll_version_rowsets[0]; - for (int i = 0; i < samll_version_rowsets.size(); i++) { - if (samll_version_rowsets[i].size() > result.size()) { - result = samll_version_rowsets[i]; + if (small_version_rowsets.size() == 0) return Status::OK(); + std::vector result = small_version_rowsets[0]; + for (int i = 0; i < small_version_rowsets.size(); i++) { + if (small_version_rowsets[i].size() > result.size()) { + result = small_version_rowsets[i]; } } for (int i = 0; i < result.size(); i++) { From 95ffbbed0fc20b0fc1e579bff1206a53ee20e0ad Mon Sep 17 00:00:00 2001 From: chenlinzhong <490103404@qq.com> Date: Tue, 7 Jun 2022 15:55:40 +0800 Subject: [PATCH 10/11] compaction quickly for small data import #9791 1.merge small versions of rowset as soon as possible to increase the import frequency of small version data 2.small version means that the number of rows is less than config::small_compaction_max_rows default 10000 --- be/src/common/config.h | 1 - be/src/olap/olap_server.cpp | 1 - 2 files changed, 2 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index ae828d5fdd289a..21e0342113f53f 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -294,7 +294,6 @@ CONF_mInt64(min_compaction_failure_interval_sec, "5"); // 5 seconds // This config can be set to limit thread number in compaction thread pool. CONF_mInt32(max_compaction_threads, "10"); - // This config can be set to limit thread number in convert rowset thread pool. CONF_mInt32(convert_rowset_thread_num, "0"); diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 76c5daff4b5b03..6e857f52f77103 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -73,7 +73,6 @@ Status StorageEngine::start_bg_threads() { .set_max_threads(max_thread_num) .build(&_compaction_thread_pool); - int32_t convert_rowset_thread_num = config::convert_rowset_thread_num; if (convert_rowset_thread_num > 0) { ThreadPoolBuilder("ConvertRowsetTaskThreadPool") From 21779deebb10e8bc3a2ba5900beae90bc6d66861 Mon Sep 17 00:00:00 2001 From: chenlinzhong <490103404@qq.com> Date: Thu, 9 Jun 2022 22:03:36 +0800 Subject: [PATCH 11/11] compaction quickly for small data import #9791 1.merge small versions of rowset as soon as possible to increase the import frequency of small version data 2.small version means that the number of rows is less than config::small_compaction_max_rows default 10000 --- be/src/agent/task_worker_pool.cpp | 14 ++++++------ be/src/common/config.h | 14 ++++++------ be/src/olap/compaction.cpp | 37 +++++++++++-------------------- be/src/olap/compaction.h | 2 +- be/src/olap/delta_writer.cpp | 6 +++-- be/src/olap/olap_server.cpp | 16 ++++++------- be/src/olap/storage_engine.h | 6 ++--- be/src/olap/tablet.cpp | 27 ++++++++++++---------- be/src/olap/tablet.h | 9 ++++---- 9 files changed, 63 insertions(+), 68 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 573f1ea293819a..c1502c33c6a95f 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -719,8 +719,8 @@ void TaskWorkerPool::_publish_version_worker_thread_callback() { << ", error_code=" << res; finish_task_request.__set_error_tablet_ids(error_tablet_ids); } else { - if (config::small_compaction_batch_size > 0) { - int submit_tablets = 0; + int submit_tablets = 0; + if (config::enable_quick_compaction && config::quick_compaction_batch_size > 0) { for (int i = 0; i < succ_tablet_ids.size(); i++) { TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet( @@ -728,19 +728,19 @@ void TaskWorkerPool::_publish_version_worker_thread_callback() { if (tablet != nullptr) { submit_tablets++; tablet->publised_count++; - if (tablet->publised_count % config::small_compaction_batch_size == 0) { - StorageEngine::instance()->submit_small_compaction_task(tablet); - LOG(INFO) << "trigger small compaction succ, tabletid:" + if (tablet->publised_count % config::quick_compaction_batch_size == 0) { + StorageEngine::instance()->submit_quick_compaction_task(tablet); + LOG(INFO) << "trigger quick compaction succ, tabletid:" << succ_tablet_ids[i] << ", publised:" << tablet->publised_count; } } else { - LOG(WARNING) << "trigger small compaction failed, tabletid:" + LOG(WARNING) << "trigger quick compaction failed, tabletid:" << succ_tablet_ids[i]; } } LOG(INFO) << "publish_version success. signature:" << agent_task_req.signature - << ", size:" << submit_tablets; + << ", size:" << succ_tablet_ids.size(); } } diff --git a/be/src/common/config.h b/be/src/common/config.h index 21e0342113f53f..34a8c19a811b66 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -301,7 +301,7 @@ CONF_mInt32(convert_rowset_thread_num, "0"); CONF_mInt32(scan_alpha_rowset_min_interval_sec, "3"); // This config can be set to limit thread number in smallcompaction thread pool. -CONF_mInt32(small_compaction_max_threads, "10"); +CONF_mInt32(quick_compaction_max_threads, "10"); // Thread count to do tablet meta checkpoint, -1 means use the data directories count. CONF_Int32(max_meta_checkpoint_threads, "-1"); @@ -744,14 +744,14 @@ CONF_Int32(parquet_reader_max_buffer_size, "50"); // if it is lower than a specific threshold, the predicate will be disabled. CONF_mInt32(bloom_filter_predicate_check_row_num, "1000"); -// For continuous versions that rows less than small_compaction_max_rows will trigger compaction quickly -// if set to 0 means turn off this feature -CONF_Int32(small_compaction_max_rows, "10000"); - +//whether turn on quick compaction feature +CONF_Bool(enable_quick_compaction, "false"); +// For continuous versions that rows less than quick_compaction_max_rows will trigger compaction quickly +CONF_Int32(quick_compaction_max_rows, "1000"); // min compaction versions -CONF_Int32(small_compaction_batch_size, "10"); +CONF_Int32(quick_compaction_batch_size, "10"); // do compaction min rowsets -CONF_Int32(small_compaction_min_rowsets, "10"); +CONF_Int32(quick_compaction_min_rowsets, "10"); } // namespace config diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index e4d87cbb712a84..3ac480194ef8c8 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -55,7 +55,7 @@ Status Compaction::execute_compact() { return st; } -Status Compaction::small_rowsets_compact() { +Status Compaction::quick_rowsets_compact() { std::unique_lock lock(_tablet->get_cumulative_compaction_lock(), std::try_to_lock); if (!lock.owns_lock()) { LOG(WARNING) << "The tablet is under cumulative compaction. tablet=" @@ -72,46 +72,35 @@ Status Compaction::small_rowsets_compact() { _input_rowsets.clear(); int version_count = _tablet->version_count(); - int64_t now = UnixMillis(); + MonotonicStopWatch watch; + watch.start(); int64_t permits = 0; - _tablet->pick_small_verson_rowsets(&_input_rowsets, &permits); - std::string input_ver = ""; - for (int i = 0; i < _input_rowsets.size(); i++) { - input_ver.append("[" + std::to_string(_input_rowsets[i]->start_version()) + ","); - input_ver.append(std::to_string(_input_rowsets[i]->start_version()) + "]"); - } - + _tablet->pick_quick_compaction_rowsets(&_input_rowsets, &permits); std::vector missedVersions; find_longest_consecutive_version(&_input_rowsets, &missedVersions); if (missedVersions.size() != 0) { - std::string logstr = ""; - for (int i = 0; i < missedVersions.size(); i++) { - logstr.append("[" + std::to_string(missedVersions[i].first)); - logstr.append("," + std::to_string(missedVersions[i].second)); - logstr.append("]"); - } - LOG(WARNING) << "small_rowsets_compaction, find missed version" << logstr - << ",input_size:" << _input_rowsets.size() << "version:" << input_ver; + LOG(WARNING) << "quick_rowsets_compaction, find missed version" + << ",input_size:" << _input_rowsets.size(); } int nums = _input_rowsets.size(); - if (_input_rowsets.size() >= config::small_compaction_min_rowsets) { + if (_input_rowsets.size() >= config::quick_compaction_min_rowsets) { Status st = check_version_continuity(_input_rowsets); if (!st.ok()) { - LOG(WARNING) << "small_rowsets_compaction failed, cause version not continuous"; + LOG(WARNING) << "quick_rowsets_compaction failed, cause version not continuous"; return st; } st = do_compaction(permits); if (!st.ok()) { gc_output_rowset(); - LOG(WARNING) << "small_rowsets_compaction failed"; + LOG(WARNING) << "quick_rowsets_compaction failed"; } else { - LOG(INFO) << "small_rowsets_compaction succ" + LOG(INFO) << "quick_compaction succ" << ", before_versions:" << version_count << ", after_versions:" << _tablet->version_count() - << ", cost:" << (UnixMillis() - now) << "ms" - << ", merged: " << nums << ", batch:" << config::small_compaction_batch_size + << ", cost:" << (watch.elapsed_time() / 1000 / 1000) << "ms" + << ", merged: " << nums << ", batch:" << config::quick_compaction_batch_size << ", segments:" << permits << ", tabletid:" << _tablet->tablet_id(); - _tablet->set_last_small_compaction_success_time(UnixMillis()); + _tablet->set_last_quick_compaction_success_time(UnixMillis()); } } return Status::OK(); diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h index 3a26e0544b0bb0..c70a82defaab52 100644 --- a/be/src/olap/compaction.h +++ b/be/src/olap/compaction.h @@ -48,7 +48,7 @@ class Compaction { // This is only for http CompactionAction Status compact(); - Status small_rowsets_compact(); + Status quick_rowsets_compact(); virtual Status prepare_compact() = 0; Status execute_compact(); diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index a45e955d8916eb..40f28516555c01 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -100,8 +100,10 @@ Status DeltaWriter::init() { MemTracker::create_tracker(-1, "DeltaWriter:" + std::to_string(_tablet->tablet_id())); // check tablet version number if (_tablet->version_count() > config::max_tablet_version_num) { - //trigger small compaction - StorageEngine::instance()->submit_small_compaction_task(_tablet); + //trigger quick compaction + if (config::enable_quick_compaction) { + StorageEngine::instance()->submit_quick_compaction_task(_tablet); + } LOG(WARNING) << "failed to init delta writer. version count: " << _tablet->version_count() << ", exceed limit: " << config::max_tablet_version_num << ". tablet: " << _tablet->full_name(); diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 6e857f52f77103..eb1d47c06559f9 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -94,9 +94,9 @@ Status StorageEngine::start_bg_threads() { .build(&_compaction_thread_pool); ThreadPoolBuilder("SmallCompactionTaskThreadPool") - .set_min_threads(config::small_compaction_max_threads) - .set_max_threads(config::small_compaction_max_threads) - .build(&_small_compaction_thread_pool); + .set_min_threads(config::quick_compaction_max_threads) + .set_max_threads(config::quick_compaction_max_threads) + .build(&_quick_compaction_thread_pool); // compaction tasks producer thread RETURN_IF_ERROR(Thread::create( @@ -671,15 +671,15 @@ Status StorageEngine::submit_compaction_task(TabletSharedPtr tablet, return _submit_compaction_task(tablet, compaction_type); } -Status StorageEngine::_handle_small_compaction(TabletSharedPtr tablet) { +Status StorageEngine::_handle_quick_compaction(TabletSharedPtr tablet) { CumulativeCompaction compact(tablet); - compact.small_rowsets_compact(); + compact.quick_rowsets_compact(); return Status::OK(); } -Status StorageEngine::submit_small_compaction_task(TabletSharedPtr tablet) { - _small_compaction_thread_pool->submit_func( - std::bind(&StorageEngine::_handle_small_compaction, this, tablet)); +Status StorageEngine::submit_quick_compaction_task(TabletSharedPtr tablet) { + _quick_compaction_thread_pool->submit_func( + std::bind(&StorageEngine::_handle_quick_compaction, this, tablet)); return Status::OK(); } diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 2ddf49689f61f0..f13d007bfac1ba 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -193,7 +193,7 @@ class StorageEngine { void check_cumulative_compaction_config(); Status submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type); - Status submit_small_compaction_task(TabletSharedPtr tablet); + Status submit_quick_compaction_task(TabletSharedPtr tablet); private: // Instance should be inited from `static open()` @@ -271,7 +271,7 @@ class StorageEngine { Status _submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type); - Status _handle_small_compaction(TabletSharedPtr); + Status _handle_quick_compaction(TabletSharedPtr); private: struct CompactionCandidate { @@ -381,7 +381,7 @@ class StorageEngine { HeartbeatFlags* _heartbeat_flags; std::unique_ptr _compaction_thread_pool; - std::unique_ptr _small_compaction_thread_pool; + std::unique_ptr _quick_compaction_thread_pool; scoped_refptr _alpha_rowset_scan_thread; std::unique_ptr _convert_rowset_thread_pool; diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index f6751d8ad0da3b..d47d5250b8ea9a 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -873,16 +873,19 @@ void Tablet::calculate_cumulative_point() { set_cumulative_layer_point(ret_cumulative_point); } -//find rowsets that rows less then "config::small_compaction_max_rows" -Status Tablet::pick_small_verson_rowsets(std::vector* input_rowsets, - int64_t* permits) { +//find rowsets that rows less then "config::quick_compaction_max_rows" +Status Tablet::pick_quick_compaction_rowsets(std::vector* input_rowsets, + int64_t* permits) { + int max_rows = config::quick_compaction_max_rows; + if (!config::enable_quick_compaction || max_rows <= 0) { + return Status::OK(); + } if (!init_succeeded()) { return Status::OLAPInternalError(OLAP_ERR_CUMULATIVE_INVALID_PARAMETERS); } int max_series_num = 1000; - int max_rows = config::small_compaction_max_rows; - if (max_rows <= 0) return Status::OK(); - std::vector> small_version_rowsets(max_series_num); + + std::vector> quick_compaction_rowsets(max_series_num); int idx = 0; std::shared_lock rdlock(_meta_lock); std::vector sortedRowset; @@ -896,7 +899,7 @@ Status Tablet::pick_small_verson_rowsets(std::vector* input_row if (!is_delete && sortedRowset[i]->start_version() > 0 && sortedRowset[i]->start_version() > cumulative_layer_point()) { if (sortedRowset[i]->num_rows() < max_rows) { - small_version_rowsets[idx].push_back(sortedRowset[i]); + quick_compaction_rowsets[idx].push_back(sortedRowset[i]); } else { idx++; if (idx > max_series_num) { @@ -905,11 +908,11 @@ Status Tablet::pick_small_verson_rowsets(std::vector* input_row } } } - if (small_version_rowsets.size() == 0) return Status::OK(); - std::vector result = small_version_rowsets[0]; - for (int i = 0; i < small_version_rowsets.size(); i++) { - if (small_version_rowsets[i].size() > result.size()) { - result = small_version_rowsets[i]; + if (quick_compaction_rowsets.size() == 0) return Status::OK(); + std::vector result = quick_compaction_rowsets[0]; + for (int i = 0; i < quick_compaction_rowsets.size(); i++) { + if (quick_compaction_rowsets[i].size() > result.size()) { + result = quick_compaction_rowsets[i]; } } for (int i = 0; i < result.size(); i++) { diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index ab6203dbedfa58..e3c5570bc2fa66 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -72,7 +72,8 @@ class Tablet : public BaseTablet { // Used in clone task, to update local meta when finishing a clone job Status revise_tablet_meta(const std::vector& rowsets_to_clone, const std::vector& versions_to_delete); - Status pick_small_verson_rowsets(std::vector* input_rowsets, int64_t* permits); + Status pick_quick_compaction_rowsets(std::vector* input_rowsets, + int64_t* permits); const int64_t cumulative_layer_point() const; void set_cumulative_layer_point(int64_t new_point); @@ -190,8 +191,8 @@ class Tablet : public BaseTablet { _last_cumu_compaction_success_millis = millis; } - void set_last_small_compaction_success_time(int64_t millis) { - _last_small_compaction_success_time_millis = millis; + void set_last_quick_compaction_success_time(int64_t millis) { + _last_quick_compaction_success_time_millis = millis; } int64_t last_base_compaction_success_time() { return _last_base_compaction_success_millis; } @@ -340,7 +341,7 @@ class Tablet : public BaseTablet { std::atomic _last_cumu_compaction_success_millis; // timestamp of last base compaction success std::atomic _last_base_compaction_success_millis; - std::atomic _last_small_compaction_success_time_millis; + std::atomic _last_quick_compaction_success_time_millis; std::atomic _cumulative_point; std::atomic _newly_created_rowset_num; std::atomic _last_checkpoint_time;