Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include <sstream>
#include <string>
#include <thread>
#include <type_traits>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -1586,7 +1587,7 @@ void PublishVersionWorkerPool::publish_version_callback(const TAgentTaskRequest&
if (tablet->exceed_version_limit(config::max_tablet_version_num * 2 / 3) &&
published_count % 20 == 0) {
auto st = _engine.submit_compaction_task(
tablet, CompactionType::CUMULATIVE_COMPACTION, true);
tablet, CompactionType::CUMULATIVE_COMPACTION, true, false);
if (!st.ok()) [[unlikely]] {
LOG(WARNING) << "trigger compaction failed, tablet_id=" << tablet_id
<< ", published=" << published_count << " : " << st;
Expand Down
41 changes: 35 additions & 6 deletions be/src/olap/olap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -854,12 +854,12 @@ int StorageEngine::_get_executing_compaction_num(
return num;
}

bool need_generate_compaction_tasks(int count, int thread_per_disk, CompactionType compaction_type,
bool all_base) {
if (count >= thread_per_disk) {
bool need_generate_compaction_tasks(int task_cnt_per_disk, int thread_per_disk,
CompactionType compaction_type, bool all_base) {
if (task_cnt_per_disk >= thread_per_disk) {
// Return if no available slot
return false;
} else if (count >= thread_per_disk - 1) {
} else if (task_cnt_per_disk >= thread_per_disk - 1) {
// Only one slot left, check if it can be assigned to base compaction task.
if (compaction_type == CompactionType::BASE_COMPACTION) {
if (all_base) {
Expand Down Expand Up @@ -912,7 +912,7 @@ std::vector<TabletSharedPtr> StorageEngine::_generate_compaction_tasks(
copied_cumu_map = _tablet_submitted_cumu_compaction;
copied_base_map = _tablet_submitted_base_compaction;
}
for (auto data_dir : data_dirs) {
for (auto* data_dir : data_dirs) {
bool need_pick_tablet = true;
// We need to reserve at least one Slot for cumulative compaction.
// So when there is only one Slot, we have to judge whether there is a cumulative compaction
Expand Down Expand Up @@ -1091,7 +1091,36 @@ Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
}

Status StorageEngine::submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type,
bool force) {
bool force, bool eager) {
if (!eager) {
DCHECK(compaction_type == CompactionType::BASE_COMPACTION ||
compaction_type == CompactionType::CUMULATIVE_COMPACTION);
std::map<DataDir*, std::unordered_set<TabletSharedPtr>> copied_cumu_map;
std::map<DataDir*, std::unordered_set<TabletSharedPtr>> copied_base_map;
{
std::unique_lock<std::mutex> lock(_tablet_submitted_compaction_mutex);
copied_cumu_map = _tablet_submitted_cumu_compaction;
copied_base_map = _tablet_submitted_base_compaction;
}
auto stores = get_stores();

auto busy_pred = [&copied_cumu_map, &copied_base_map, compaction_type,
this](auto* data_dir) {
int count = _get_executing_compaction_num(copied_base_map[data_dir]) +
_get_executing_compaction_num(copied_cumu_map[data_dir]);
int paral = data_dir->is_ssd_disk() ? config::compaction_task_num_per_fast_disk
: config::compaction_task_num_per_disk;
bool all_base = copied_cumu_map[data_dir].empty();
return need_generate_compaction_tasks(count, paral, compaction_type, all_base);
};

bool is_busy = std::none_of(stores.begin(), stores.end(), busy_pred);
if (is_busy) {
LOG_EVERY_N(WARNING, 100)
<< "Too busy to submit a compaction task, tablet=" << tablet->get_table_id();
return Status::OK();
}
}
_update_cumulative_compaction_policy();
// alter table tableName set ("compaction_policy"="time_series")
// if atler table's compaction policy, we need to modify tablet compaction policy shared ptr
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ class StorageEngine {
void check_cumulative_compaction_config();

Status submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type,
bool force);
bool force, bool eager = true);
Status submit_seg_compaction_task(std::shared_ptr<SegcompactionWorker> worker,
SegCompactionCandidatesSharedPtr segments);

Expand Down