From 7351a059ceb1f61073e2d617cc5056a1afa17aa7 Mon Sep 17 00:00:00 2001 From: yiguolei Date: Tue, 11 Jun 2019 16:51:40 +0800 Subject: [PATCH] Call delete old files when all dir convert successfully --- be/src/olap/data_dir.cpp | 51 ++++++++++-------------- be/src/olap/data_dir.h | 11 +++++- be/src/olap/rowset/segment_group.cpp | 58 ++++++++++++++++++++++++++-- be/src/olap/rowset/segment_group.h | 6 +++ be/src/olap/storage_engine.cpp | 24 ++++++++++++ 5 files changed, 115 insertions(+), 35 deletions(-) diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp index e7b0dfec907299..af150af528f9b1 100755 --- a/be/src/olap/data_dir.cpp +++ b/be/src/olap/data_dir.cpp @@ -563,7 +563,7 @@ OLAPStatus DataDir::_convert_old_tablet() { bool parsed = olap_header_msg.ParseFromString(value); if (!parsed) { LOG(FATAL) << "convert olap header to tablet meta failed when load olap header tablet=" - << tablet_id << "." << schema_hash; + << tablet_id << "." << schema_hash; return false; } string old_data_path_prefix = get_absolute_tablet_path(olap_header_msg, true); @@ -612,16 +612,10 @@ OLAPStatus DataDir::_convert_old_tablet() { return OLAP_SUCCESS; } -OLAPStatus DataDir::_remove_old_meta_and_files(const std::set& tablet_ids) { +OLAPStatus DataDir::remove_old_meta_and_files() { // clean old meta(olap header message) - auto clean_old_meta_func = [this, &tablet_ids](int64_t tablet_id, + auto clean_old_meta_func = [this](int64_t tablet_id, int32_t schema_hash, const std::string& value) -> bool { - if (tablet_ids.find(tablet_id) == tablet_ids.end()) { - LOG(WARNING) << "tablet not load successfully, skip clean meta for tablet=" - << tablet_id << "." << schema_hash - << " from data dir: " << _path; - return true; - } TabletMetaManager::remove(this, tablet_id, schema_hash, OLD_HEADER_PREFIX); LOG(INFO) << "successfully clean old tablet meta(olap header) for tablet=" << tablet_id << "." << schema_hash @@ -638,14 +632,8 @@ OLAPStatus DataDir::_remove_old_meta_and_files(const std::set& tablet_i } // clean old files because they have hard links in new file name format - auto clean_old_files_func = [this, &tablet_ids](int64_t tablet_id, + auto clean_old_files_func = [this](int64_t tablet_id, int32_t schema_hash, const std::string& value) -> bool { - if (tablet_ids.find(tablet_id) == tablet_ids.end()) { - LOG(WARNING) << "tablet not load successfully, skip clean files for tablet=" - << tablet_id << "." << schema_hash - << " from data dir: " << _path; - return true; - } TabletMetaPB tablet_meta_pb; bool parsed = tablet_meta_pb.ParseFromString(value); if (!parsed) { @@ -697,6 +685,19 @@ OLAPStatus DataDir::_remove_old_meta_and_files(const std::set& tablet_i return OLAP_SUCCESS; } +bool DataDir::convert_old_data_success() { + return _convert_old_data_success; +} + +OLAPStatus DataDir::set_convert_finished() { + OLAPStatus res = _meta->set_tablet_convert_finished(); + if (res != OLAP_SUCCESS) { + LOG(FATAL) << "save convert flag failed after convert old tablet. dir=" << _path; + return res; + } + return OLAP_SUCCESS; +} + // TODO(ygl): deal with rowsets and tablets when load failed OLAPStatus DataDir::load() { // check if this is an old data path @@ -706,7 +707,7 @@ OLAPStatus DataDir::load() { LOG(WARNING) << "get convert flag from meta failed dir=" << _path; return res; } - bool should_remove_old_files = false; + _convert_old_data_success = false; if (!is_tablet_convert_finished) { _clean_unfinished_converting_data(); res = _convert_old_tablet(); @@ -714,18 +715,11 @@ OLAPStatus DataDir::load() { LOG(FATAL) << "convert old tablet failed for dir = " << _path; return res; } - // TODO(ygl): should load tablet successfully and then set convert flag and clean old files - res = _meta->set_tablet_convert_finished(); - if (res != OLAP_SUCCESS) { - LOG(FATAL) << "save convert flag failed after convert old tablet. dir=" << _path; - return res; - } - // convert may be successfully, but crashed before remove old files - // depend on gc thread to recycle the old files - // _remove_old_meta_and_files(data_dir); - should_remove_old_files = true; + + _convert_old_data_success = true; } else { LOG(INFO) << "tablets have been converted, skip convert process"; + _convert_old_data_success = true; } LOG(INFO) << "start to load tablets from " << _path; @@ -853,9 +847,6 @@ OLAPStatus DataDir::load() { << " current valid tablet uid: " << tablet->tablet_uid(); } } - if (should_remove_old_files) { - _remove_old_meta_and_files(tablet_ids); - } return OLAP_SUCCESS; } diff --git a/be/src/olap/data_dir.h b/be/src/olap/data_dir.h index a05fcba4d1cc6e..40a7f084527f14 100644 --- a/be/src/olap/data_dir.h +++ b/be/src/olap/data_dir.h @@ -113,6 +113,13 @@ class DataDir { // this function will collect garbage paths scaned by last function void perform_path_gc(); + + OLAPStatus remove_old_meta_and_files(); + + bool convert_old_data_success(); + + OLAPStatus set_convert_finished(); + private: std::string _cluster_id_path() const { return _path + CLUSTER_ID_PREFIX; } Status _init_cluster_id(); @@ -127,7 +134,6 @@ class DataDir { Status _write_cluster_id_to_path(const std::string& path, int32_t cluster_id); OLAPStatus _clean_unfinished_converting_data(); OLAPStatus _convert_old_tablet(); - OLAPStatus _remove_old_meta_and_files(const std::set& tablet_ids); void _remove_check_paths_no_lock(const std::set& paths); @@ -172,6 +178,9 @@ class DataDir { std::set _pending_path_ids; RWMutex _pending_path_mutex; + + // used in convert process + bool _convert_old_data_success; }; } // namespace doris diff --git a/be/src/olap/rowset/segment_group.cpp b/be/src/olap/rowset/segment_group.cpp index ca07130d1185da..ad37f14c3047b8 100644 --- a/be/src/olap/rowset/segment_group.cpp +++ b/be/src/olap/rowset/segment_group.cpp @@ -797,11 +797,28 @@ OLAPStatus SegmentGroup::convert_to_old_files(const std::string& snapshot_path, OLAPStatus SegmentGroup::remove_old_files(std::vector* links_to_remove) { for (int segment_id = 0; segment_id < _num_segments; segment_id++) { std::string old_data_file_name = construct_old_data_file_path(_rowset_path_prefix, segment_id); - RETURN_NOT_OK(remove_dir(old_data_file_name)); - links_to_remove->push_back(old_data_file_name); + if (check_dir_existed(old_data_file_name)) { + RETURN_NOT_OK(remove_dir(old_data_file_name)); + links_to_remove->push_back(old_data_file_name); + } std::string old_index_file_name = construct_old_index_file_path(_rowset_path_prefix, segment_id); - RETURN_NOT_OK(remove_dir(old_index_file_name)); - links_to_remove->push_back(old_index_file_name); + if (check_dir_existed(old_index_file_name)) { + RETURN_NOT_OK(remove_dir(old_index_file_name)); + links_to_remove->push_back(old_index_file_name); + } + // if segment group id == 0, it maybe convert from old files which do not have segment group id in file path + if (_segment_group_id == 0) { + old_data_file_name = _construct_err_sg_data_file_path(_rowset_path_prefix, segment_id); + if (check_dir_existed(old_data_file_name)) { + RETURN_NOT_OK(remove_dir(old_data_file_name)); + links_to_remove->push_back(old_data_file_name); + } + old_index_file_name = _construct_err_sg_index_file_path(_rowset_path_prefix, segment_id); + if (check_dir_existed(old_index_file_name)) { + RETURN_NOT_OK(remove_dir(old_index_file_name)); + links_to_remove->push_back(old_index_file_name); + } + } } std::string pending_delta_path = _rowset_path_prefix + PENDING_DELTA_PREFIX; if (check_dir_existed(pending_delta_path)) { @@ -857,6 +874,22 @@ std::string SegmentGroup::construct_old_data_file_path(const std::string& path_p } } +std::string SegmentGroup::_construct_err_sg_index_file_path(const std::string& path_prefix, int32_t segment_id) const { + if (_is_pending) { + return _construct_old_pending_file_path(path_prefix, segment_id, ".idx"); + } else { + return _construct_err_sg_file_path(path_prefix, segment_id, ".idx"); + } +} + +std::string SegmentGroup::_construct_err_sg_data_file_path(const std::string& path_prefix, int32_t segment_id) const { + if (_is_pending) { + return _construct_old_pending_file_path(path_prefix, segment_id, ".dat"); + } else { + return _construct_err_sg_file_path(path_prefix, segment_id, ".dat"); + } +} + std::string SegmentGroup::_construct_old_pending_file_path(const std::string& path_prefix, int32_t segment_id, const std::string& suffix) const { std::stringstream file_path; @@ -895,4 +928,21 @@ std::string SegmentGroup::_construct_old_file_path(const std::string& path_prefi return file_path; } +// construct file path for sg_id == -1 +std::string SegmentGroup::_construct_err_sg_file_path(const std::string& path_prefix, int32_t segment_id, const std::string& suffix) const { + char file_path[OLAP_MAX_PATH_LEN]; + snprintf(file_path, + sizeof(file_path), + "%s/%ld_%ld_%ld_%ld_%d%s", + path_prefix.c_str(), + _tablet_id, + _version.first, + _version.second, + _version_hash, + segment_id, + suffix.c_str()); + + return file_path; +} + } // namespace doris diff --git a/be/src/olap/rowset/segment_group.h b/be/src/olap/rowset/segment_group.h index 94ece9ac585fb2..fa274d9ec65a54 100644 --- a/be/src/olap/rowset/segment_group.h +++ b/be/src/olap/rowset/segment_group.h @@ -273,6 +273,12 @@ class SegmentGroup { std::string _construct_old_file_path(const std::string& path_prefix, int32_t segment_id, const std::string& suffix) const; + std::string _construct_err_sg_file_path(const std::string& path_prefix, int32_t segment_id, const std::string& suffix) const; + + std::string _construct_err_sg_index_file_path(const std::string& path_prefix, int32_t segment_id) const; + + std::string _construct_err_sg_data_file_path(const std::string& path_prefix, int32_t segment_id) const; + private: int64_t _tablet_id; int64_t _rowset_id; diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index bdd6a9bff03c13..96fb5be0ca3b87 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -136,6 +136,30 @@ void StorageEngine::load_data_dirs(const std::vector& data_dirs) { for (auto& thread : threads) { thread.join(); } + + // check whether all data dir convert successfully + for (auto data_dir : data_dirs) { + if (!data_dir->convert_old_data_success()) { + // if any dir convert failed, exit the process + LOG(FATAL) << "dir = " << data_dir->path() << "convert failed"; + } + } + + std::vector clean_old_file_threads; + for (auto data_dir : data_dirs) { + clean_old_file_threads.emplace_back([data_dir] { + data_dir->set_convert_finished(); + auto res = data_dir->remove_old_meta_and_files(); + if (res != OLAP_SUCCESS) { + LOG(WARNING) << "failed to clean old files dir = " << data_dir->path() + << " res = " << res; + } + }); + } + + for (auto& thread : clean_old_file_threads) { + thread.join(); + } } OLAPStatus StorageEngine::open() {