diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 0a30aad20adb70..c1502c33c6a95f 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -691,11 +691,13 @@ void TaskWorkerPool::_publish_version_worker_thread_callback() { VLOG_NOTICE << "get publish version task, signature:" << agent_task_req.signature; std::vector error_tablet_ids; + std::vector succ_tablet_ids; uint32_t retry_time = 0; Status res = Status::OK(); while (retry_time < PUBLISH_VERSION_MAX_RETRY) { error_tablet_ids.clear(); - EnginePublishVersionTask engine_task(publish_version_req, &error_tablet_ids); + EnginePublishVersionTask engine_task(publish_version_req, &error_tablet_ids, + &succ_tablet_ids); res = _env->storage_engine()->execute_task(&engine_task); if (res.ok()) { break; @@ -717,7 +719,29 @@ void TaskWorkerPool::_publish_version_worker_thread_callback() { << ", error_code=" << res; finish_task_request.__set_error_tablet_ids(error_tablet_ids); } else { - LOG(INFO) << "publish_version success. signature:" << agent_task_req.signature; + int submit_tablets = 0; + if (config::enable_quick_compaction && config::quick_compaction_batch_size > 0) { + for (int i = 0; i < succ_tablet_ids.size(); i++) { + TabletSharedPtr tablet = + StorageEngine::instance()->tablet_manager()->get_tablet( + succ_tablet_ids[i]); + if (tablet != nullptr) { + submit_tablets++; + tablet->publised_count++; + if (tablet->publised_count % config::quick_compaction_batch_size == 0) { + StorageEngine::instance()->submit_quick_compaction_task(tablet); + LOG(INFO) << "trigger quick compaction succ, tabletid:" + << succ_tablet_ids[i] + << ", publised:" << tablet->publised_count; + } + } else { + LOG(WARNING) << "trigger quick compaction failed, tabletid:" + << succ_tablet_ids[i]; + } + } + LOG(INFO) << "publish_version success. signature:" << agent_task_req.signature + << ", size:" << succ_tablet_ids.size(); + } } res.to_thrift(&finish_task_request.task_status); diff --git a/be/src/common/config.h b/be/src/common/config.h index fe40be22a8afb5..34a8c19a811b66 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -300,6 +300,9 @@ CONF_mInt32(convert_rowset_thread_num, "0"); // initial sleep interval in seconds of scan alpha rowset CONF_mInt32(scan_alpha_rowset_min_interval_sec, "3"); +// This config can be set to limit thread number in smallcompaction thread pool. +CONF_mInt32(quick_compaction_max_threads, "10"); + // Thread count to do tablet meta checkpoint, -1 means use the data directories count. CONF_Int32(max_meta_checkpoint_threads, "-1"); @@ -741,6 +744,15 @@ CONF_Int32(parquet_reader_max_buffer_size, "50"); // if it is lower than a specific threshold, the predicate will be disabled. CONF_mInt32(bloom_filter_predicate_check_row_num, "1000"); +//whether turn on quick compaction feature +CONF_Bool(enable_quick_compaction, "false"); +// For continuous versions that rows less than quick_compaction_max_rows will trigger compaction quickly +CONF_Int32(quick_compaction_max_rows, "1000"); +// min compaction versions +CONF_Int32(quick_compaction_batch_size, "10"); +// do compaction min rowsets +CONF_Int32(quick_compaction_min_rowsets, "10"); + } // namespace config } // namespace doris diff --git a/be/src/common/status.h b/be/src/common/status.h index cca23836706924..58fdb7d2f453cf 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -24,7 +24,7 @@ namespace doris { #define APPLY_FOR_ERROR_CODES(M) \ M(OLAP_SUCCESS, 0, "", false) \ M(OLAP_ERR_OTHER_ERROR, -1, "", true) \ - M(OLAP_REQUEST_FAILED, -2, "", true) \ + M(OLAP_REQUEST_FAILED, -2, "", false) \ M(OLAP_ERR_OS_ERROR, -100, "", true) \ M(OLAP_ERR_DIR_NOT_EXIST, -101, "", true) \ M(OLAP_ERR_FILE_NOT_EXIST, -102, "", true) \ @@ -92,7 +92,7 @@ namespace doris { M(OLAP_ERR_CE_LOAD_TABLE_ERROR, -303, "", true) \ M(OLAP_ERR_CE_NOT_FINISHED, -304, "", true) \ M(OLAP_ERR_CE_TABLET_ID_EXIST, -305, "", true) \ - M(OLAP_ERR_CE_TRY_CE_LOCK_ERROR, -306, "", true) \ + M(OLAP_ERR_CE_TRY_CE_LOCK_ERROR, -306, "", false) \ M(OLAP_ERR_TABLE_VERSION_DUPLICATE_ERROR, -400, "", true) \ M(OLAP_ERR_TABLE_VERSION_INDEX_MISMATCH_ERROR, -401, "", true) \ M(OLAP_ERR_TABLE_INDEX_VALIDATE_ERROR, -402, "", true) \ @@ -176,8 +176,8 @@ namespace doris { M(OLAP_ERR_HEADER_LOAD_JSON_HEADER, -1410, "", true) \ M(OLAP_ERR_HEADER_INIT_FAILED, -1411, "", true) \ M(OLAP_ERR_HEADER_PB_PARSE_FAILED, -1412, "", true) \ - M(OLAP_ERR_HEADER_HAS_PENDING_DATA, -1413, "", true) \ - M(OLAP_ERR_SCHEMA_SCHEMA_INVALID, -1500, "", true) \ + M(OLAP_ERR_HEADER_HAS_PENDING_DATA, -1413, "", false) \ + M(OLAP_ERR_SCHEMA_SCHEMA_INVALID, -1500, "", false) \ M(OLAP_ERR_SCHEMA_SCHEMA_FIELD_INVALID, -1501, "", true) \ M(OLAP_ERR_ALTER_MULTI_TABLE_ERR, -1600, "", true) \ M(OLAP_ERR_ALTER_DELTA_DOES_NOT_EXISTS, -1601, "", true) \ diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index c279ac7d62737b..3ac480194ef8c8 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -55,6 +55,57 @@ Status Compaction::execute_compact() { return st; } +Status Compaction::quick_rowsets_compact() { + std::unique_lock lock(_tablet->get_cumulative_compaction_lock(), std::try_to_lock); + if (!lock.owns_lock()) { + LOG(WARNING) << "The tablet is under cumulative compaction. tablet=" + << _tablet->full_name(); + return Status::OLAPInternalError(OLAP_ERR_CE_TRY_CE_LOCK_ERROR); + } + + // Clone task may happen after compaction task is submitted to thread pool, and rowsets picked + // for compaction may change. In this case, current compaction task should not be executed. + if (_tablet->get_clone_occurred()) { + _tablet->set_clone_occurred(false); + return Status::OLAPInternalError(OLAP_ERR_CUMULATIVE_CLONE_OCCURRED); + } + + _input_rowsets.clear(); + int version_count = _tablet->version_count(); + MonotonicStopWatch watch; + watch.start(); + int64_t permits = 0; + _tablet->pick_quick_compaction_rowsets(&_input_rowsets, &permits); + std::vector missedVersions; + find_longest_consecutive_version(&_input_rowsets, &missedVersions); + if (missedVersions.size() != 0) { + LOG(WARNING) << "quick_rowsets_compaction, find missed version" + << ",input_size:" << _input_rowsets.size(); + } + int nums = _input_rowsets.size(); + if (_input_rowsets.size() >= config::quick_compaction_min_rowsets) { + Status st = check_version_continuity(_input_rowsets); + if (!st.ok()) { + LOG(WARNING) << "quick_rowsets_compaction failed, cause version not continuous"; + return st; + } + st = do_compaction(permits); + if (!st.ok()) { + gc_output_rowset(); + LOG(WARNING) << "quick_rowsets_compaction failed"; + } else { + LOG(INFO) << "quick_compaction succ" + << ", before_versions:" << version_count + << ", after_versions:" << _tablet->version_count() + << ", cost:" << (watch.elapsed_time() / 1000 / 1000) << "ms" + << ", merged: " << nums << ", batch:" << config::quick_compaction_batch_size + << ", segments:" << permits << ", tabletid:" << _tablet->tablet_id(); + _tablet->set_last_quick_compaction_success_time(UnixMillis()); + } + } + return Status::OK(); +} + Status Compaction::do_compaction(int64_t permits) { TRACE("start to do compaction"); _tablet->data_dir()->disks_compaction_score_increment(permits); diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h index bd37cae275a642..c70a82defaab52 100644 --- a/be/src/olap/compaction.h +++ b/be/src/olap/compaction.h @@ -48,6 +48,7 @@ class Compaction { // This is only for http CompactionAction Status compact(); + Status quick_rowsets_compact(); virtual Status prepare_compact() = 0; Status execute_compact(); diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 132a79a390afd5..40f28516555c01 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -17,6 +17,8 @@ #include "olap/delta_writer.h" +#include "olap/base_compaction.h" +#include "olap/cumulative_compaction.h" #include "olap/data_dir.h" #include "olap/memtable.h" #include "olap/memtable_flush_executor.h" @@ -98,6 +100,10 @@ Status DeltaWriter::init() { MemTracker::create_tracker(-1, "DeltaWriter:" + std::to_string(_tablet->tablet_id())); // check tablet version number if (_tablet->version_count() > config::max_tablet_version_num) { + //trigger quick compaction + if (config::enable_quick_compaction) { + StorageEngine::instance()->submit_quick_compaction_task(_tablet); + } LOG(WARNING) << "failed to init delta writer. version count: " << _tablet->version_count() << ", exceed limit: " << config::max_tablet_version_num << ". tablet: " << _tablet->full_name(); diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 2ad264d2bcce0a..eb1d47c06559f9 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -88,6 +88,16 @@ Status StorageEngine::start_bg_threads() { LOG(INFO) << "alpha rowset scan thread started"; } + ThreadPoolBuilder("CompactionTaskThreadPool") + .set_min_threads(max_thread_num) + .set_max_threads(max_thread_num) + .build(&_compaction_thread_pool); + + ThreadPoolBuilder("SmallCompactionTaskThreadPool") + .set_min_threads(config::quick_compaction_max_threads) + .set_max_threads(config::quick_compaction_max_threads) + .build(&_quick_compaction_thread_pool); + // compaction tasks producer thread RETURN_IF_ERROR(Thread::create( "StorageEngine", "compaction_tasks_producer_thread", @@ -661,4 +671,16 @@ Status StorageEngine::submit_compaction_task(TabletSharedPtr tablet, return _submit_compaction_task(tablet, compaction_type); } +Status StorageEngine::_handle_quick_compaction(TabletSharedPtr tablet) { + CumulativeCompaction compact(tablet); + compact.quick_rowsets_compact(); + return Status::OK(); +} + +Status StorageEngine::submit_quick_compaction_task(TabletSharedPtr tablet) { + _quick_compaction_thread_pool->submit_func( + std::bind(&StorageEngine::_handle_quick_compaction, this, tablet)); + return Status::OK(); +} + } // namespace doris diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index ce33223c5aaa1e..f13d007bfac1ba 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -193,6 +193,7 @@ class StorageEngine { void check_cumulative_compaction_config(); Status submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type); + Status submit_quick_compaction_task(TabletSharedPtr tablet); private: // Instance should be inited from `static open()` @@ -270,6 +271,8 @@ class StorageEngine { Status _submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type); + Status _handle_quick_compaction(TabletSharedPtr); + private: struct CompactionCandidate { CompactionCandidate(uint32_t nicumulative_compaction_, int64_t tablet_id_, uint32_t index_) @@ -378,6 +381,7 @@ class StorageEngine { HeartbeatFlags* _heartbeat_flags; std::unique_ptr _compaction_thread_pool; + std::unique_ptr _quick_compaction_thread_pool; scoped_refptr _alpha_rowset_scan_thread; std::unique_ptr _convert_rowset_thread_pool; diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index df7f02a1f971da..d47d5250b8ea9a 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -870,10 +870,59 @@ void Tablet::calculate_cumulative_point() { if (ret_cumulative_point == K_INVALID_CUMULATIVE_POINT) { return; } - set_cumulative_layer_point(ret_cumulative_point); } +//find rowsets that rows less then "config::quick_compaction_max_rows" +Status Tablet::pick_quick_compaction_rowsets(std::vector* input_rowsets, + int64_t* permits) { + int max_rows = config::quick_compaction_max_rows; + if (!config::enable_quick_compaction || max_rows <= 0) { + return Status::OK(); + } + if (!init_succeeded()) { + return Status::OLAPInternalError(OLAP_ERR_CUMULATIVE_INVALID_PARAMETERS); + } + int max_series_num = 1000; + + std::vector> quick_compaction_rowsets(max_series_num); + int idx = 0; + std::shared_lock rdlock(_meta_lock); + std::vector sortedRowset; + for (auto& rs : _rs_version_map) { + sortedRowset.push_back(rs.second); + } + std::sort(sortedRowset.begin(), sortedRowset.end(), Rowset::comparator); + if (tablet_state() == TABLET_RUNNING) { + for (int i = 0; i < sortedRowset.size(); i++) { + bool is_delete = version_for_delete_predicate(sortedRowset[i]->version()); + if (!is_delete && sortedRowset[i]->start_version() > 0 && + sortedRowset[i]->start_version() > cumulative_layer_point()) { + if (sortedRowset[i]->num_rows() < max_rows) { + quick_compaction_rowsets[idx].push_back(sortedRowset[i]); + } else { + idx++; + if (idx > max_series_num) { + break; + } + } + } + } + if (quick_compaction_rowsets.size() == 0) return Status::OK(); + std::vector result = quick_compaction_rowsets[0]; + for (int i = 0; i < quick_compaction_rowsets.size(); i++) { + if (quick_compaction_rowsets[i].size() > result.size()) { + result = quick_compaction_rowsets[i]; + } + } + for (int i = 0; i < result.size(); i++) { + *permits += result[i]->num_segments(); + input_rowsets->push_back(result[i]); + } + } + return Status::OK(); +} + Status Tablet::split_range(const OlapTuple& start_key_strings, const OlapTuple& end_key_strings, uint64_t request_block_row_count, std::vector* ranges) { DCHECK(ranges != nullptr); diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index a7f1830ef13885..e3c5570bc2fa66 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -72,6 +72,8 @@ class Tablet : public BaseTablet { // Used in clone task, to update local meta when finishing a clone job Status revise_tablet_meta(const std::vector& rowsets_to_clone, const std::vector& versions_to_delete); + Status pick_quick_compaction_rowsets(std::vector* input_rowsets, + int64_t* permits); const int64_t cumulative_layer_point() const; void set_cumulative_layer_point(int64_t new_point); @@ -189,6 +191,10 @@ class Tablet : public BaseTablet { _last_cumu_compaction_success_millis = millis; } + void set_last_quick_compaction_success_time(int64_t millis) { + _last_quick_compaction_success_time_millis = millis; + } + int64_t last_base_compaction_success_time() { return _last_base_compaction_success_millis; } void set_last_base_compaction_success_time(int64_t millis) { _last_base_compaction_success_millis = millis; @@ -335,7 +341,7 @@ class Tablet : public BaseTablet { std::atomic _last_cumu_compaction_success_millis; // timestamp of last base compaction success std::atomic _last_base_compaction_success_millis; - + std::atomic _last_quick_compaction_success_time_millis; std::atomic _cumulative_point; std::atomic _newly_created_rowset_num; std::atomic _last_checkpoint_time; @@ -362,6 +368,7 @@ class Tablet : public BaseTablet { public: IntCounter* flush_bytes; IntCounter* flush_count; + std::atomic publised_count = 0; }; inline CumulativeCompactionPolicy* Tablet::cumulative_compaction_policy() { diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp index de4d6449f2d549..5c7397c8ad63c1 100644 --- a/be/src/olap/task/engine_publish_version_task.cpp +++ b/be/src/olap/task/engine_publish_version_task.cpp @@ -28,8 +28,11 @@ namespace doris { using std::map; EnginePublishVersionTask::EnginePublishVersionTask(TPublishVersionRequest& publish_version_req, - std::vector* error_tablet_ids) - : _publish_version_req(publish_version_req), _error_tablet_ids(error_tablet_ids) {} + std::vector* error_tablet_ids, + std::vector* succ_tablet_ids) + : _publish_version_req(publish_version_req), + _error_tablet_ids(error_tablet_ids), + _succ_tablet_ids(succ_tablet_ids) {} Status EnginePublishVersionTask::finish() { Status res = Status::OK(); @@ -106,6 +109,9 @@ Status EnginePublishVersionTask::finish() { res = publish_status; continue; } + if (_succ_tablet_ids != nullptr) { + _succ_tablet_ids->push_back(tablet_info.tablet_id); + } partition_related_tablet_infos.erase(tablet_info); VLOG_NOTICE << "publish version successfully on tablet. tablet=" << tablet->full_name() << ", transaction_id=" << transaction_id << ", version=" << version.first diff --git a/be/src/olap/task/engine_publish_version_task.h b/be/src/olap/task/engine_publish_version_task.h index 2601ed8f60f389..4086f466d3da94 100644 --- a/be/src/olap/task/engine_publish_version_task.h +++ b/be/src/olap/task/engine_publish_version_task.h @@ -27,7 +27,8 @@ namespace doris { class EnginePublishVersionTask : public EngineTask { public: EnginePublishVersionTask(TPublishVersionRequest& publish_version_req, - vector* error_tablet_ids); + vector* error_tablet_ids, + std::vector* succ_tablet_ids = nullptr); ~EnginePublishVersionTask() {} virtual Status finish() override; @@ -35,6 +36,7 @@ class EnginePublishVersionTask : public EngineTask { private: const TPublishVersionRequest& _publish_version_req; vector* _error_tablet_ids; + vector* _succ_tablet_ids; }; } // namespace doris