Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 50 additions & 11 deletions be/src/olap/data_dir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,55 @@ void DataDir::remove_pending_ids(const std::string& id) {
_pending_path_ids.erase(id);
}

// gc unused tablet schemahash dir
void DataDir::perform_path_gc_by_tablet() {
std::unique_lock<std::mutex> lck(_check_path_mutex);
_cv.wait(lck, [this] { return _stop_bg_worker || !_all_tablet_schemahash_paths.empty(); });
if (_stop_bg_worker) {
return;
}
LOG(INFO) << "start to path gc by tablet schemahash.";
int counter = 0;
for (auto& path : _all_tablet_schemahash_paths) {
++counter;
if (config::path_gc_check_step > 0 && counter % config::path_gc_check_step == 0) {
SleepFor(MonoDelta::FromMilliseconds(config::path_gc_check_step_interval_ms));
}
TTabletId tablet_id = -1;
TSchemaHash schema_hash = -1;
bool is_valid = _tablet_manager->get_tablet_id_and_schema_hash_from_path(path, &tablet_id,
&schema_hash);
if (!is_valid) {
LOG(WARNING) << "unknown path:" << path;
continue;
}
// should not happen, because already check it is a valid tablet schema hash path in previous step
// so that log fatal here
if (tablet_id < 1 || schema_hash < 1) {
LOG(WARNING) << "invalid tablet id " << tablet_id << " or schema hash " << schema_hash
<< ", path=" << path;
continue;
}
TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id, schema_hash);
if (tablet != nullptr) {
// could find the tablet, then skip check it
continue;
}
boost::filesystem::path tablet_path(path);
boost::filesystem::path data_dir_path =
tablet_path.parent_path().parent_path().parent_path().parent_path();
std::string data_dir_string = data_dir_path.string();
DataDir* data_dir = StorageEngine::instance()->get_store(data_dir_string);
if (data_dir == nullptr) {
LOG(WARNING) << "could not find data dir for tablet path " << path;
continue;
}
_tablet_manager->try_delete_unused_tablet_path(data_dir, tablet_id, schema_hash, path);
}
_all_tablet_schemahash_paths.clear();
LOG(INFO) << "finished one time path gc by tablet.";
}

void DataDir::perform_path_gc_by_rowsetid() {
// init the set of valid path
// validate the path in data dir
Expand Down Expand Up @@ -833,7 +882,6 @@ void DataDir::perform_path_scan() {
}
for (const auto& tablet_id : tablet_ids) {
std::string tablet_id_path = shard_path + "/" + tablet_id;
_all_check_paths.insert(tablet_id_path);
std::set<std::string> schema_hashes;
ret = FileUtils::list_dirs_files(tablet_id_path, &schema_hashes, nullptr,
Env::Default());
Expand All @@ -844,7 +892,7 @@ void DataDir::perform_path_scan() {
}
for (const auto& schema_hash : schema_hashes) {
std::string tablet_schema_hash_path = tablet_id_path + "/" + schema_hash;
_all_check_paths.insert(tablet_schema_hash_path);
_all_tablet_schemahash_paths.insert(tablet_schema_hash_path);
std::set<std::string> rowset_files;

ret = FileUtils::list_dirs_files(tablet_schema_hash_path, nullptr,
Expand Down Expand Up @@ -879,15 +927,6 @@ bool DataDir::_check_pending_ids(const std::string& id) {
return _pending_path_ids.find(id) != _pending_path_ids.end();
}

void DataDir::_remove_check_paths_no_lock(const std::set<std::string>& paths) {
for (const auto& path : paths) {
auto path_iter = _all_check_paths.find(path);
if (path_iter != _all_check_paths.end()) {
_all_check_paths.erase(path_iter);
}
}
}

Status DataDir::update_capacity() {
try {
boost::filesystem::path path_name(_path);
Expand Down
5 changes: 3 additions & 2 deletions be/src/olap/data_dir.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ class DataDir {

void perform_path_gc_by_rowsetid();

void perform_path_gc_by_tablet();

OLAPStatus remove_old_meta_and_files();

bool convert_old_data_success();
Expand Down Expand Up @@ -138,8 +140,6 @@ class DataDir {
OLAPStatus _clean_unfinished_converting_data();
OLAPStatus _convert_old_tablet();

void _remove_check_paths_no_lock(const std::set<std::string>& paths);

void _process_garbage_path(const std::string& path);

void _remove_check_paths(const std::set<std::string>& paths);
Expand Down Expand Up @@ -182,6 +182,7 @@ class DataDir {
std::mutex _check_path_mutex;
std::condition_variable _cv;
std::set<std::string> _all_check_paths;
std::set<std::string> _all_tablet_schemahash_paths;

RWMutex _pending_path_mutex;
std::set<std::string> _pending_path_ids;
Expand Down
5 changes: 4 additions & 1 deletion be/src/olap/olap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,10 @@ void* StorageEngine::_path_gc_thread_callback(void* arg) {
LOG(INFO) << "try to start path gc thread!";

while (!_stop_bg_worker) {
LOG(INFO) << "try to perform path gc!";
LOG(INFO) << "try to perform path gc by tablet!";
((DataDir*)arg)->perform_path_gc_by_tablet();

LOG(INFO) << "try to perform path gc by rowsetid!";
// perform path gc by rowset id
((DataDir*)arg)->perform_path_gc_by_rowsetid();

Expand Down
54 changes: 54 additions & 0 deletions be/src/olap/tablet_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,8 @@ TabletSharedPtr TabletManager::get_tablet(TTabletId tablet_id, SchemaHash schema
bool TabletManager::get_tablet_id_and_schema_hash_from_path(
const string& path, TTabletId* tablet_id, TSchemaHash* schema_hash) {
static re2::RE2 normal_re("/data/\\d+/(\\d+)/(\\d+)($|/)");
// match tablet schema hash data path, for example, the path is /data/1/16791/29998
// 1 is shard id , 16791 is tablet id, 29998 is schema hash
if (RE2::PartialMatch(path, normal_re, tablet_id, schema_hash)) {
return true;
}
Expand Down Expand Up @@ -812,6 +814,12 @@ OLAPStatus TabletManager::load_tablet_from_meta(DataDir* data_dir, TTabletId tab
return OLAP_ERR_TABLE_CREATE_FROM_HEADER_ERROR;
}

// check if the tablet path exists since the path maybe deleted by gc thread
if (!Env::Default()->path_exists(tablet->tablet_path()).ok()) {
LOG(WARNING) << "tablet path not exists, create tablet failed, path=" << tablet->tablet_path();
return OLAP_ERR_TABLE_ALREADY_DELETED_ERROR;
}

if (tablet_meta->tablet_state() == TABLET_SHUTDOWN) {
LOG(INFO) << "fail to load tablet because it is to be deleted. tablet_id=" << tablet_id
<< " schema_hash=" << schema_hash << ", path=" << data_dir->path();
Expand Down Expand Up @@ -1086,6 +1094,52 @@ OLAPStatus TabletManager::start_trash_sweep() {
return OLAP_SUCCESS;
} // start_trash_sweep

void TabletManager::register_clone_tablet(int64_t tablet_id) {
RWMutex& tablet_map_lock = _get_tablet_map_lock(tablet_id);
WriteLock wlock(&tablet_map_lock);
_tablets_under_clone.insert(tablet_id);
}

void TabletManager::unregister_clone_tablet(int64_t tablet_id) {
RWMutex& tablet_map_lock = _get_tablet_map_lock(tablet_id);
WriteLock wlock(&tablet_map_lock);
_tablets_under_clone.erase(tablet_id);
}

void TabletManager::try_delete_unused_tablet_path(DataDir* data_dir, TTabletId tablet_id,
SchemaHash schema_hash, const string& schema_hash_path) {
// acquire the read lock, so that there is no creating tablet or load tablet from meta tasks
// create tablet and load tablet task should check whether the dir exists
RWMutex& tablet_map_lock = _get_tablet_map_lock(tablet_id);
ReadLock rlock(&tablet_map_lock);

// check if meta already exists
TabletMetaSharedPtr tablet_meta(new TabletMeta());
OLAPStatus check_st = TabletMetaManager::get_meta(data_dir, tablet_id,
schema_hash, tablet_meta);
if (check_st == OLAP_SUCCESS) {
LOG(INFO) << "tablet meta exist is meta store, skip delete the path " << schema_hash_path;
return;
}

if (_tablets_under_clone.count(tablet_id) > 0) {
LOG(INFO) << "tablet is under clone, skip delete the path " << schema_hash_path;
return;
}

// TODO(ygl): may do other checks in the future
if (Env::Default()->path_exists(schema_hash_path).ok()) {
LOG(INFO) << "start to move tablet to trash. tablet_path = " << schema_hash_path;
OLAPStatus rm_st = move_to_trash(schema_hash_path, schema_hash_path);
if (rm_st != OLAP_SUCCESS) {
LOG(WARNING) << "fail to move dir to trash. dir=" << schema_hash_path;
} else {
LOG(INFO) << "move path " << schema_hash_path << " to trash successfully";
}
}
return;
}

bool TabletManager::try_schema_change_lock(TTabletId tablet_id) {
bool res = false;
VLOG(3) << "try_schema_change_lock begin. tablet_id=" << tablet_id;
Expand Down
9 changes: 9 additions & 0 deletions be/src/olap/tablet_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ class TabletManager {
// Prevent schema change executed concurrently.
bool try_schema_change_lock(TTabletId tablet_id);

void try_delete_unused_tablet_path(DataDir* data_dir, TTabletId tablet_id,
SchemaHash schema_hash, const string& schema_hash_path);

void update_root_path_info(std::map<std::string, DataDirInfo>* path_map,
size_t* tablet_counter);

Expand All @@ -134,6 +137,9 @@ class TabletManager {

void obtain_all_tablets(vector<TabletInfo> &tablets_info);

void register_clone_tablet(int64_t tablet_id);
void unregister_clone_tablet(int64_t tablet_id);

private:
// Add a tablet pointer to StorageEngine
// If force, drop the existing tablet add this new one
Expand Down Expand Up @@ -221,6 +227,9 @@ class TabletManager {
int64_t _last_update_stat_ms;

inline tablet_map_t& _get_tablet_map(TTabletId tablet_id);

std::set<int64_t> _tablets_under_clone;
std::set<int64_t> _tablets_under_restore;
};

inline RWMutex& TabletManager::_get_tablet_map_lock(TTabletId tabletId) {
Expand Down
8 changes: 8 additions & 0 deletions be/src/olap/task/engine_clone_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ EngineCloneTask::EngineCloneTask(const TCloneReq& clone_req,
_master_info(master_info) {}

OLAPStatus EngineCloneTask::execute() {
// register the tablet to avoid it is deleted by gc thread during clone process
StorageEngine::instance()->tablet_manager()->register_clone_tablet(_clone_req.tablet_id);
OLAPStatus st = _do_clone();
StorageEngine::instance()->tablet_manager()->unregister_clone_tablet(_clone_req.tablet_id);
return st;
}

OLAPStatus EngineCloneTask::_do_clone() {
AgentStatus status = DORIS_SUCCESS;
string src_file_path;
TBackend src_host;
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/task/engine_clone_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class EngineCloneTask : public EngineTask {

private:

OLAPStatus _do_clone();

virtual OLAPStatus _finish_clone(Tablet* tablet, const std::string& clone_dir,
int64_t committed_version, bool is_incremental_clone);

Expand Down