Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -691,11 +691,13 @@ void TaskWorkerPool::_publish_version_worker_thread_callback() {
VLOG_NOTICE << "get publish version task, signature:" << agent_task_req.signature;

std::vector<TTabletId> error_tablet_ids;
std::vector<TTabletId> succ_tablet_ids;
uint32_t retry_time = 0;
Status res = Status::OK();
while (retry_time < PUBLISH_VERSION_MAX_RETRY) {
error_tablet_ids.clear();
EnginePublishVersionTask engine_task(publish_version_req, &error_tablet_ids);
EnginePublishVersionTask engine_task(publish_version_req, &error_tablet_ids,
&succ_tablet_ids);
res = _env->storage_engine()->execute_task(&engine_task);
if (res.ok()) {
break;
Expand All @@ -717,7 +719,29 @@ void TaskWorkerPool::_publish_version_worker_thread_callback() {
<< ", error_code=" << res;
finish_task_request.__set_error_tablet_ids(error_tablet_ids);
} else {
LOG(INFO) << "publish_version success. signature:" << agent_task_req.signature;
int submit_tablets = 0;
if (config::enable_quick_compaction && config::quick_compaction_batch_size > 0) {
for (int i = 0; i < succ_tablet_ids.size(); i++) {
TabletSharedPtr tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(
succ_tablet_ids[i]);
if (tablet != nullptr) {
submit_tablets++;
tablet->publised_count++;
if (tablet->publised_count % config::quick_compaction_batch_size == 0) {
StorageEngine::instance()->submit_quick_compaction_task(tablet);
LOG(INFO) << "trigger quick compaction succ, tabletid:"
<< succ_tablet_ids[i]
<< ", publised:" << tablet->publised_count;
}
} else {
LOG(WARNING) << "trigger quick compaction failed, tabletid:"
<< succ_tablet_ids[i];
}
}
LOG(INFO) << "publish_version success. signature:" << agent_task_req.signature
<< ", size:" << succ_tablet_ids.size();
}
}

res.to_thrift(&finish_task_request.task_status);
Expand Down
12 changes: 12 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,9 @@ CONF_mInt32(convert_rowset_thread_num, "0");
// initial sleep interval in seconds of scan alpha rowset
CONF_mInt32(scan_alpha_rowset_min_interval_sec, "3");

// This config can be set to limit thread number in smallcompaction thread pool.
CONF_mInt32(quick_compaction_max_threads, "10");

// Thread count to do tablet meta checkpoint, -1 means use the data directories count.
CONF_Int32(max_meta_checkpoint_threads, "-1");

Expand Down Expand Up @@ -741,6 +744,15 @@ CONF_Int32(parquet_reader_max_buffer_size, "50");
// if it is lower than a specific threshold, the predicate will be disabled.
CONF_mInt32(bloom_filter_predicate_check_row_num, "1000");

//whether turn on quick compaction feature
CONF_Bool(enable_quick_compaction, "false");
// For continuous versions that rows less than quick_compaction_max_rows will trigger compaction quickly
CONF_Int32(quick_compaction_max_rows, "1000");
// min compaction versions
CONF_Int32(quick_compaction_batch_size, "10");
// do compaction min rowsets
CONF_Int32(quick_compaction_min_rowsets, "10");

} // namespace config

} // namespace doris
8 changes: 4 additions & 4 deletions be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace doris {
#define APPLY_FOR_ERROR_CODES(M) \
M(OLAP_SUCCESS, 0, "", false) \
M(OLAP_ERR_OTHER_ERROR, -1, "", true) \
M(OLAP_REQUEST_FAILED, -2, "", true) \
M(OLAP_REQUEST_FAILED, -2, "", false) \
M(OLAP_ERR_OS_ERROR, -100, "", true) \
M(OLAP_ERR_DIR_NOT_EXIST, -101, "", true) \
M(OLAP_ERR_FILE_NOT_EXIST, -102, "", true) \
Expand Down Expand Up @@ -92,7 +92,7 @@ namespace doris {
M(OLAP_ERR_CE_LOAD_TABLE_ERROR, -303, "", true) \
M(OLAP_ERR_CE_NOT_FINISHED, -304, "", true) \
M(OLAP_ERR_CE_TABLET_ID_EXIST, -305, "", true) \
M(OLAP_ERR_CE_TRY_CE_LOCK_ERROR, -306, "", true) \
M(OLAP_ERR_CE_TRY_CE_LOCK_ERROR, -306, "", false) \
M(OLAP_ERR_TABLE_VERSION_DUPLICATE_ERROR, -400, "", true) \
M(OLAP_ERR_TABLE_VERSION_INDEX_MISMATCH_ERROR, -401, "", true) \
M(OLAP_ERR_TABLE_INDEX_VALIDATE_ERROR, -402, "", true) \
Expand Down Expand Up @@ -176,8 +176,8 @@ namespace doris {
M(OLAP_ERR_HEADER_LOAD_JSON_HEADER, -1410, "", true) \
M(OLAP_ERR_HEADER_INIT_FAILED, -1411, "", true) \
M(OLAP_ERR_HEADER_PB_PARSE_FAILED, -1412, "", true) \
M(OLAP_ERR_HEADER_HAS_PENDING_DATA, -1413, "", true) \
M(OLAP_ERR_SCHEMA_SCHEMA_INVALID, -1500, "", true) \
M(OLAP_ERR_HEADER_HAS_PENDING_DATA, -1413, "", false) \
M(OLAP_ERR_SCHEMA_SCHEMA_INVALID, -1500, "", false) \
M(OLAP_ERR_SCHEMA_SCHEMA_FIELD_INVALID, -1501, "", true) \
M(OLAP_ERR_ALTER_MULTI_TABLE_ERR, -1600, "", true) \
M(OLAP_ERR_ALTER_DELTA_DOES_NOT_EXISTS, -1601, "", true) \
Expand Down
51 changes: 51 additions & 0 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,57 @@ Status Compaction::execute_compact() {
return st;
}

Status Compaction::quick_rowsets_compact() {
std::unique_lock<std::mutex> lock(_tablet->get_cumulative_compaction_lock(), std::try_to_lock);
if (!lock.owns_lock()) {
LOG(WARNING) << "The tablet is under cumulative compaction. tablet="
<< _tablet->full_name();
return Status::OLAPInternalError(OLAP_ERR_CE_TRY_CE_LOCK_ERROR);
}

// Clone task may happen after compaction task is submitted to thread pool, and rowsets picked
// for compaction may change. In this case, current compaction task should not be executed.
if (_tablet->get_clone_occurred()) {
_tablet->set_clone_occurred(false);
return Status::OLAPInternalError(OLAP_ERR_CUMULATIVE_CLONE_OCCURRED);
}

_input_rowsets.clear();
int version_count = _tablet->version_count();
MonotonicStopWatch watch;
watch.start();
int64_t permits = 0;
_tablet->pick_quick_compaction_rowsets(&_input_rowsets, &permits);
std::vector<Version> missedVersions;
find_longest_consecutive_version(&_input_rowsets, &missedVersions);
if (missedVersions.size() != 0) {
LOG(WARNING) << "quick_rowsets_compaction, find missed version"
<< ",input_size:" << _input_rowsets.size();
}
int nums = _input_rowsets.size();
if (_input_rowsets.size() >= config::quick_compaction_min_rowsets) {
Status st = check_version_continuity(_input_rowsets);
if (!st.ok()) {
LOG(WARNING) << "quick_rowsets_compaction failed, cause version not continuous";
return st;
}
st = do_compaction(permits);
if (!st.ok()) {
gc_output_rowset();
LOG(WARNING) << "quick_rowsets_compaction failed";
} else {
LOG(INFO) << "quick_compaction succ"
<< ", before_versions:" << version_count
<< ", after_versions:" << _tablet->version_count()
<< ", cost:" << (watch.elapsed_time() / 1000 / 1000) << "ms"
<< ", merged: " << nums << ", batch:" << config::quick_compaction_batch_size
<< ", segments:" << permits << ", tabletid:" << _tablet->tablet_id();
_tablet->set_last_quick_compaction_success_time(UnixMillis());
}
}
return Status::OK();
}

Status Compaction::do_compaction(int64_t permits) {
TRACE("start to do compaction");
_tablet->data_dir()->disks_compaction_score_increment(permits);
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class Compaction {

// This is only for http CompactionAction
Status compact();
Status quick_rowsets_compact();

virtual Status prepare_compact() = 0;
Status execute_compact();
Expand Down
6 changes: 6 additions & 0 deletions be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "olap/delta_writer.h"

#include "olap/base_compaction.h"
#include "olap/cumulative_compaction.h"
#include "olap/data_dir.h"
#include "olap/memtable.h"
#include "olap/memtable_flush_executor.h"
Expand Down Expand Up @@ -98,6 +100,10 @@ Status DeltaWriter::init() {
MemTracker::create_tracker(-1, "DeltaWriter:" + std::to_string(_tablet->tablet_id()));
// check tablet version number
if (_tablet->version_count() > config::max_tablet_version_num) {
//trigger quick compaction
if (config::enable_quick_compaction) {
StorageEngine::instance()->submit_quick_compaction_task(_tablet);
}
LOG(WARNING) << "failed to init delta writer. version count: " << _tablet->version_count()
<< ", exceed limit: " << config::max_tablet_version_num
<< ". tablet: " << _tablet->full_name();
Expand Down
22 changes: 22 additions & 0 deletions be/src/olap/olap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,16 @@ Status StorageEngine::start_bg_threads() {
LOG(INFO) << "alpha rowset scan thread started";
}

ThreadPoolBuilder("CompactionTaskThreadPool")
.set_min_threads(max_thread_num)
.set_max_threads(max_thread_num)
.build(&_compaction_thread_pool);

ThreadPoolBuilder("SmallCompactionTaskThreadPool")
.set_min_threads(config::quick_compaction_max_threads)
.set_max_threads(config::quick_compaction_max_threads)
.build(&_quick_compaction_thread_pool);

// compaction tasks producer thread
RETURN_IF_ERROR(Thread::create(
"StorageEngine", "compaction_tasks_producer_thread",
Expand Down Expand Up @@ -661,4 +671,16 @@ Status StorageEngine::submit_compaction_task(TabletSharedPtr tablet,
return _submit_compaction_task(tablet, compaction_type);
}

Status StorageEngine::_handle_quick_compaction(TabletSharedPtr tablet) {
CumulativeCompaction compact(tablet);
compact.quick_rowsets_compact();
return Status::OK();
}

Status StorageEngine::submit_quick_compaction_task(TabletSharedPtr tablet) {
_quick_compaction_thread_pool->submit_func(
std::bind<void>(&StorageEngine::_handle_quick_compaction, this, tablet));
return Status::OK();
}

} // namespace doris
4 changes: 4 additions & 0 deletions be/src/olap/storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ class StorageEngine {
void check_cumulative_compaction_config();

Status submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type);
Status submit_quick_compaction_task(TabletSharedPtr tablet);

private:
// Instance should be inited from `static open()`
Expand Down Expand Up @@ -270,6 +271,8 @@ class StorageEngine {

Status _submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type);

Status _handle_quick_compaction(TabletSharedPtr);

private:
struct CompactionCandidate {
CompactionCandidate(uint32_t nicumulative_compaction_, int64_t tablet_id_, uint32_t index_)
Expand Down Expand Up @@ -378,6 +381,7 @@ class StorageEngine {
HeartbeatFlags* _heartbeat_flags;

std::unique_ptr<ThreadPool> _compaction_thread_pool;
std::unique_ptr<ThreadPool> _quick_compaction_thread_pool;

scoped_refptr<Thread> _alpha_rowset_scan_thread;
std::unique_ptr<ThreadPool> _convert_rowset_thread_pool;
Expand Down
51 changes: 50 additions & 1 deletion be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -870,10 +870,59 @@ void Tablet::calculate_cumulative_point() {
if (ret_cumulative_point == K_INVALID_CUMULATIVE_POINT) {
return;
}

set_cumulative_layer_point(ret_cumulative_point);
}

//find rowsets that rows less then "config::quick_compaction_max_rows"
Status Tablet::pick_quick_compaction_rowsets(std::vector<RowsetSharedPtr>* input_rowsets,
int64_t* permits) {
int max_rows = config::quick_compaction_max_rows;
if (!config::enable_quick_compaction || max_rows <= 0) {
return Status::OK();
}
if (!init_succeeded()) {
return Status::OLAPInternalError(OLAP_ERR_CUMULATIVE_INVALID_PARAMETERS);
}
int max_series_num = 1000;

std::vector<std::vector<RowsetSharedPtr>> quick_compaction_rowsets(max_series_num);
int idx = 0;
std::shared_lock rdlock(_meta_lock);
std::vector<RowsetSharedPtr> sortedRowset;
for (auto& rs : _rs_version_map) {
sortedRowset.push_back(rs.second);
}
std::sort(sortedRowset.begin(), sortedRowset.end(), Rowset::comparator);
if (tablet_state() == TABLET_RUNNING) {
for (int i = 0; i < sortedRowset.size(); i++) {
bool is_delete = version_for_delete_predicate(sortedRowset[i]->version());
if (!is_delete && sortedRowset[i]->start_version() > 0 &&
sortedRowset[i]->start_version() > cumulative_layer_point()) {
if (sortedRowset[i]->num_rows() < max_rows) {
quick_compaction_rowsets[idx].push_back(sortedRowset[i]);
} else {
idx++;
if (idx > max_series_num) {
break;
}
}
}
}
if (quick_compaction_rowsets.size() == 0) return Status::OK();
std::vector<RowsetSharedPtr> result = quick_compaction_rowsets[0];
for (int i = 0; i < quick_compaction_rowsets.size(); i++) {
if (quick_compaction_rowsets[i].size() > result.size()) {
result = quick_compaction_rowsets[i];
}
}
for (int i = 0; i < result.size(); i++) {
*permits += result[i]->num_segments();
input_rowsets->push_back(result[i]);
}
}
return Status::OK();
}

Status Tablet::split_range(const OlapTuple& start_key_strings, const OlapTuple& end_key_strings,
uint64_t request_block_row_count, std::vector<OlapTuple>* ranges) {
DCHECK(ranges != nullptr);
Expand Down
9 changes: 8 additions & 1 deletion be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ class Tablet : public BaseTablet {
// Used in clone task, to update local meta when finishing a clone job
Status revise_tablet_meta(const std::vector<RowsetMetaSharedPtr>& rowsets_to_clone,
const std::vector<Version>& versions_to_delete);
Status pick_quick_compaction_rowsets(std::vector<RowsetSharedPtr>* input_rowsets,
int64_t* permits);

const int64_t cumulative_layer_point() const;
void set_cumulative_layer_point(int64_t new_point);
Expand Down Expand Up @@ -189,6 +191,10 @@ class Tablet : public BaseTablet {
_last_cumu_compaction_success_millis = millis;
}

void set_last_quick_compaction_success_time(int64_t millis) {
_last_quick_compaction_success_time_millis = millis;
}

int64_t last_base_compaction_success_time() { return _last_base_compaction_success_millis; }
void set_last_base_compaction_success_time(int64_t millis) {
_last_base_compaction_success_millis = millis;
Expand Down Expand Up @@ -335,7 +341,7 @@ class Tablet : public BaseTablet {
std::atomic<int64_t> _last_cumu_compaction_success_millis;
// timestamp of last base compaction success
std::atomic<int64_t> _last_base_compaction_success_millis;

std::atomic<int64_t> _last_quick_compaction_success_time_millis;
std::atomic<int64_t> _cumulative_point;
std::atomic<int32_t> _newly_created_rowset_num;
std::atomic<int64_t> _last_checkpoint_time;
Expand All @@ -362,6 +368,7 @@ class Tablet : public BaseTablet {
public:
IntCounter* flush_bytes;
IntCounter* flush_count;
std::atomic<int64_t> publised_count = 0;
};

inline CumulativeCompactionPolicy* Tablet::cumulative_compaction_policy() {
Expand Down
10 changes: 8 additions & 2 deletions be/src/olap/task/engine_publish_version_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@ namespace doris {
using std::map;

EnginePublishVersionTask::EnginePublishVersionTask(TPublishVersionRequest& publish_version_req,
std::vector<TTabletId>* error_tablet_ids)
: _publish_version_req(publish_version_req), _error_tablet_ids(error_tablet_ids) {}
std::vector<TTabletId>* error_tablet_ids,
std::vector<TTabletId>* succ_tablet_ids)
: _publish_version_req(publish_version_req),
_error_tablet_ids(error_tablet_ids),
_succ_tablet_ids(succ_tablet_ids) {}

Status EnginePublishVersionTask::finish() {
Status res = Status::OK();
Expand Down Expand Up @@ -106,6 +109,9 @@ Status EnginePublishVersionTask::finish() {
res = publish_status;
continue;
}
if (_succ_tablet_ids != nullptr) {
_succ_tablet_ids->push_back(tablet_info.tablet_id);
}
partition_related_tablet_infos.erase(tablet_info);
VLOG_NOTICE << "publish version successfully on tablet. tablet=" << tablet->full_name()
<< ", transaction_id=" << transaction_id << ", version=" << version.first
Expand Down
4 changes: 3 additions & 1 deletion be/src/olap/task/engine_publish_version_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,16 @@ namespace doris {
class EnginePublishVersionTask : public EngineTask {
public:
EnginePublishVersionTask(TPublishVersionRequest& publish_version_req,
vector<TTabletId>* error_tablet_ids);
vector<TTabletId>* error_tablet_ids,
std::vector<TTabletId>* succ_tablet_ids = nullptr);
~EnginePublishVersionTask() {}

virtual Status finish() override;

private:
const TPublishVersionRequest& _publish_version_req;
vector<TTabletId>* _error_tablet_ids;
vector<TTabletId>* _succ_tablet_ids;
};

} // namespace doris
Expand Down