Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions be/src/http/action/compaction_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,20 @@ Status CompactionAction::_handle_run_status_compaction(HttpRequest* req, std::st
std::string compaction_type;
bool run_status = false;

{
// Full compaction holds both base compaction lock and cumu compaction lock.
// So we can not judge if full compaction is running by check these two locks holding.
// Here, we use a variable 'is_full_compaction_running' to check if full compaction is running.
if (tablet->is_full_compaction_running()) {
msg = "compaction task for this tablet is running";
compaction_type = "full";
run_status = true;
*json_result = strings::Substitute(json_template, run_status, msg, tablet_id,
compaction_type);
return Status::OK();
}
}

{
// use try lock to check this tablet is running cumulative compaction
std::unique_lock<std::mutex> lock_cumulative(tablet->get_cumulative_compaction_lock(),
Expand Down
7 changes: 5 additions & 2 deletions be/src/olap/full_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ using namespace ErrorCode;
FullCompaction::FullCompaction(const TabletSharedPtr& tablet)
: Compaction(tablet, "FullCompaction:" + std::to_string(tablet->tablet_id())) {}

FullCompaction::~FullCompaction() {}
FullCompaction::~FullCompaction() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: use '= default' to define a trivial destructor [modernize-use-equals-default]

FullCompaction::~FullCompaction() {
                ^

_tablet->set_is_full_compaction_running(false);
}

Status FullCompaction::prepare_compact() {
if (!_tablet->init_succeeded()) {
Expand All @@ -53,10 +55,10 @@ Status FullCompaction::prepare_compact() {

std::unique_lock base_lock(_tablet->get_base_compaction_lock());
std::unique_lock cumu_lock(_tablet->get_cumulative_compaction_lock());
_tablet->set_is_full_compaction_running(true);

// 1. pick rowsets to compact
RETURN_IF_ERROR(pick_rowsets_to_compact());

return Status::OK();
}

Expand Down Expand Up @@ -115,6 +117,7 @@ Status FullCompaction::modify_rowsets(const Merger::Statistics* stats) {
std::lock_guard<std::mutex> rowset_update_wlock(_tablet->get_rowset_update_lock());
std::lock_guard<std::shared_mutex> meta_wlock(_tablet->get_header_lock());
RETURN_IF_ERROR(_tablet->modify_rowsets(output_rowsets, _input_rowsets, true));
DBUG_EXECUTE_IF("FullCompaction.modify_rowsets.sleep", { sleep(5); })
_tablet->save_meta();
}
return Status::OK();
Expand Down
12 changes: 10 additions & 2 deletions be/src/olap/olap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -896,11 +896,16 @@ bool StorageEngine::_push_tablet_into_submitted_compaction(TabletSharedPtr table
.insert(tablet->tablet_id())
.second);
break;
default:
case CompactionType::BASE_COMPACTION:
already_existed = !(_tablet_submitted_base_compaction[tablet->data_dir()]
.insert(tablet->tablet_id())
.second);
break;
case CompactionType::FULL_COMPACTION:
already_existed = !(_tablet_submitted_full_compaction[tablet->data_dir()]
.insert(tablet->tablet_id())
.second);
break;
}
return already_existed;
}
Expand All @@ -913,9 +918,12 @@ void StorageEngine::_pop_tablet_from_submitted_compaction(TabletSharedPtr tablet
case CompactionType::CUMULATIVE_COMPACTION:
removed = _tablet_submitted_cumu_compaction[tablet->data_dir()].erase(tablet->tablet_id());
break;
default:
case CompactionType::BASE_COMPACTION:
removed = _tablet_submitted_base_compaction[tablet->data_dir()].erase(tablet->tablet_id());
break;
case CompactionType::FULL_COMPACTION:
removed = _tablet_submitted_full_compaction[tablet->data_dir()].erase(tablet->tablet_id());
break;
}

if (removed == 1) {
Expand Down
24 changes: 24 additions & 0 deletions be/src/olap/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1383,6 +1383,30 @@ Status StorageEngine::get_compaction_status_json(std::string* result) {
}
root.AddMember(base_key, path_obj2, root.GetAllocator());

// full
const std::string& full = "FullCompaction";
rapidjson::Value full_key;
full_key.SetString(full.c_str(), full.length(), root.GetAllocator());
rapidjson::Document path_obj3;
path_obj3.SetObject();
for (auto& it : _tablet_submitted_full_compaction) {
const std::string& dir = it.first->path();
rapidjson::Value path_key;
path_key.SetString(dir.c_str(), dir.length(), path_obj3.GetAllocator());

rapidjson::Document arr;
arr.SetArray();

for (auto& tablet_id : it.second) {
rapidjson::Value key;
const std::string& key_str = std::to_string(tablet_id);
key.SetString(key_str.c_str(), key_str.length(), path_obj3.GetAllocator());
arr.PushBack(key, root.GetAllocator());
}
path_obj3.AddMember(path_key, arr, path_obj3.GetAllocator());
}
root.AddMember(full_key, path_obj3, root.GetAllocator());

rapidjson::StringBuffer strbuf;
rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(strbuf);
root.Accept(writer);
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@ class StorageEngine {
// a tablet can do base and cumulative compaction at same time
std::map<DataDir*, std::unordered_set<TTabletId>> _tablet_submitted_cumu_compaction;
std::map<DataDir*, std::unordered_set<TTabletId>> _tablet_submitted_base_compaction;
std::map<DataDir*, std::unordered_set<TTabletId>> _tablet_submitted_full_compaction;

std::mutex _peer_replica_infos_mutex;
// key: tabletId
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1575,7 +1575,7 @@ void Tablet::get_compaction_status(std::string* json_result) {
root.AddMember("last base failure time", base_value, root.GetAllocator());
rapidjson::Value full_value;
format_str = ToStringFromUnixMillis(_last_full_compaction_failure_millis.load());
base_value.SetString(format_str.c_str(), format_str.length(), root.GetAllocator());
full_value.SetString(format_str.c_str(), format_str.length(), root.GetAllocator());
root.AddMember("last full failure time", full_value, root.GetAllocator());
rapidjson::Value cumu_success_value;
format_str = ToStringFromUnixMillis(_last_cumu_compaction_success_millis.load());
Expand Down
5 changes: 5 additions & 0 deletions be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,10 @@ class Tablet : public BaseTablet {
void set_alter_failed(bool alter_failed) { _alter_failed = alter_failed; }
bool is_alter_failed() { return _alter_failed; }

void set_is_full_compaction_running(bool is_full_compaction_running) {
_is_full_compaction_running = is_full_compaction_running;
}
inline bool is_full_compaction_running() const { return _is_full_compaction_running; }
void clear_cache();

private:
Expand Down Expand Up @@ -743,6 +747,7 @@ class Tablet : public BaseTablet {
IntCounter* flush_bytes;
IntCounter* flush_finish_count;
std::atomic<int64_t> publised_count = 0;
std::atomic_bool _is_full_compaction_running = false;
};

inline CumulativeCompactionPolicy* Tablet::cumulative_compaction_policy() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_full_compaction_run_status","nonConcurrent") {


def tableName = "full_compaction_run_status_test"

// test successful group commit async load
sql """ DROP TABLE IF EXISTS ${tableName} """

String backend_id;

def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);

backend_id = backendId_to_backendIP.keySet()[0]

sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
`k` int ,
`v` int ,
) engine=olap
DISTRIBUTED BY HASH(`k`)
BUCKETS 2
properties(
"replication_num" = "1",
"disable_auto_compaction" = "true")
"""

sql """ INSERT INTO ${tableName} VALUES (0,00)"""
sql """ INSERT INTO ${tableName} VALUES (1,10)"""
sql """ INSERT INTO ${tableName} VALUES (2,20)"""
sql """ INSERT INTO ${tableName} VALUES (3,30)"""
sql """ INSERT INTO ${tableName} VALUES (4,40)"""
sql """ INSERT INTO ${tableName} VALUES (5,50)"""
sql """ INSERT INTO ${tableName} VALUES (6,60)"""
sql """ INSERT INTO ${tableName} VALUES (7,70)"""
sql """ INSERT INTO ${tableName} VALUES (8,80)"""
sql """ INSERT INTO ${tableName} VALUES (9,90)"""

GetDebugPoint().clearDebugPointsForAllBEs()

def exception = false;
try {
GetDebugPoint().enableDebugPointForAllBEs("FullCompaction.modify_rowsets.sleep")
def tablets = sql_return_maparray """ show tablets from ${tableName}; """
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId

times = 1
do{
(code, out, err) = be_run_full_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
++times
sleep(1000)
} while (parseJson(out.trim()).status.toLowerCase()!="success" && times<=10)

(code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
assertTrue(compactJson.msg.toLowerCase().contains("is running"))
}
Thread.sleep(30000)
logger.info("sleep 30s to wait full compaction finish.")
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId

(code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
assertTrue(compactJson.msg.toLowerCase().contains("is not running"))
}
} catch (Exception e) {
logger.info(e.getMessage())
exception = true;
} finally {
GetDebugPoint().disableDebugPointForAllBEs("FullCompaction.modify_rowsets.sleep")
assertFalse(exception)
}
}